diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java index f6a33bc26a..bed0235d8e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java @@ -36,7 +36,9 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; @@ -102,12 +104,20 @@ public void targetAndSourceHaveDifferentEncryptionZoneKeys() throws Throwable { put(HiveConf.ConfVars.REPLDIR.varname, primary.repldDir); }}, "test_key123"); + List dumpWithClause = Arrays.asList( + "'hive.repl.add.raw.reserved.namespace'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + replica.externalTableWarehouseRoot + "'", + "'distcp.options.skipcrccheck'=''", + "'" + HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() +"'"); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create table encrypted_table (id int, value string)") .run("insert into table encrypted_table values (1,'value1')") .run("insert into table encrypted_table values (2,'value2')") - .dump(primaryDbName); + .dump(primaryDbName, dumpWithClause); replica .run("repl load " + primaryDbName + " into " + replicatedDbName diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index c46103a254..66b652871c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -92,7 +92,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -371,8 +370,7 @@ private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tu confTemp.set("hive.repl.enable.move.optimization", "true"); Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), replicadb, - null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), - Collections.emptyList()); + null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId)); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext()); replLoadTask.executeTask(null); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 1ba8003384..df913f303d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -139,6 +139,7 @@ public void replicationWithoutExternalTables() throws Throwable { @Test public void externalTableReplicationWithDefaultPaths() throws Throwable { + List withClauseOptions = externalTableBasePathWithClause(); //creates external tables with partitions WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -149,12 +150,12 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("insert into table t2 partition(country='india') values ('bangalore')") .run("insert into table t2 partition(country='us') values ('austin')") .run("insert into table t2 partition(country='france') values ('paris')") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName, withClauseOptions); // verify that the external table info is written correctly for bootstrap assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName); - List withClauseOptions = externalTableBasePathWithClause(); + replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -181,7 +182,7 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("create external table t3 (id int)") .run("insert into table t3 values (10)") .run("create external table t4 as select id from t3") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName, withClauseOptions); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation); @@ -245,7 +246,7 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { // Create base directory but use HDFS path without schema or authority details. // Hive should pick up the local cluster's HDFS schema/authority. externalTableBasePathWithClause(); - List loadWithClause = Arrays.asList( + List withClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.update'=''" @@ -255,9 +256,9 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { .run("create external table a (i int, j int) " + "row format delimited fields terminated by ',' " + "location '" + externalTableLocation.toUri() + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 'a'") .verifyResults(Collections.singletonList("a")) @@ -271,11 +272,10 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { outputStream.write("1,2\n".getBytes()); outputStream.write("13,21\n".getBytes()); } + WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)") + .dump(primaryDbName, withClause); - primary.run("create table b (i int)") - .dump(primaryDbName); - - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("select i From a") .verifyResults(new String[] { "1", "13" }) .run("select j from a") @@ -286,9 +286,9 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/"); primary.run("use " + primaryDbName) .run("alter table a set location '" + externalTableLocation + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select i From a") .verifyResults(Collections.emptyList()); @@ -302,18 +302,18 @@ public void externalTableWithPartitions() throws Throwable { DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); fs.mkdirs(externalTableLocation, new FsPermission("777")); - List loadWithClause = externalTableBasePathWithClause(); + List withClause = externalTableBasePathWithClause(); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t2 (place string) partitioned by (country string) row format " + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + "'") .run("insert into t2 partition(country='india') values ('bangalore')") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't2'") .verifyResults(new String[] { "t2" }) @@ -332,11 +332,11 @@ public void externalTableWithPartitions() throws Throwable { tuple = primary.run("use " + primaryDbName) .run("insert into t2 partition(country='australia') values ('sydney')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select distinct(country) from t2") .verifyResults(new String[] { "india", "australia" }) @@ -361,9 +361,9 @@ public void externalTableWithPartitions() throws Throwable { primary.run("use " + primaryDbName) .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation .toString() + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select place from t2 where country='france'") .verifyResults(new String[] { "paris" }) @@ -377,9 +377,9 @@ public void externalTableWithPartitions() throws Throwable { primary.run("use " + primaryDbName) .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select place from t2 where country='france'") .verifyResults(new String[] {}) @@ -397,17 +397,19 @@ public void externalTableWithPartitions() throws Throwable { primary.run("use " + primaryDbName) .run("insert into table t2 partition(country='france') values ('lyon')") .run("alter table t2 set location '" + tmpLocation2 + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause); + replica.load(replicatedDbName, primaryDbName, withClause); assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); } @Test public void externalTableIncrementalReplication() throws Throwable { - WarehouseInstance.Tuple tuple = primary.dumpWithCommand("repl dump " + primaryDbName); + List withClause = externalTableBasePathWithClause(); + String replDumpCommand = "repl dump " + primaryDbName + + " WITH (" + withClause.get(0) + "," + withClause.get(1) + ")"; + WarehouseInstance.Tuple tuple = primary.dumpWithCommand(replDumpCommand); replica.load(replicatedDbName, primaryDbName); - Path externalTableLocation = new Path("/" + testName.getMethodName() + "/t1/"); DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); @@ -419,12 +421,12 @@ public void externalTableIncrementalReplication() throws Throwable { + "'") .run("alter table t1 add partition(country='india')") .run("alter table t1 add partition(country='us')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); // Add new data externally, to a partition, but under the partition level top directory - // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. + // Also, it is added after dumping the events so data should not be seen at target after REPL LOAD. Path partitionDir = new Path(externalTableLocation, "country=india"); try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { outputStream.write("pune\n".getBytes()); @@ -436,16 +438,29 @@ public void externalTableIncrementalReplication() throws Throwable { } List loadWithClause = externalTableBasePathWithClause(); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") .run("show partitions t1") .verifyResults(new String[] { "country=india", "country=us" }) .run("select place from t1 order by place") - .verifyResults(new String[] { "bangalore", "mumbai", "pune" }) + .verifyResults(new String[] {}) .verifyReplTargetProperty(replicatedDbName); + // The Data should be seen after next dump-and-load cycle. + tuple = primary.run("use " + primaryDbName) + .dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] {"country=india", "country=us"}) + .run("select place from t1 order by place") + .verifyResults(new String[] {"bangalore", "mumbai", "pune"}) + .verifyReplTargetProperty(replicatedDbName); + // Delete one of the file and update another one. fs.delete(new Path(partitionDir, "file.txt"), true); fs.delete(new Path(partitionDir, "file1.txt"), true); @@ -454,10 +469,10 @@ public void externalTableIncrementalReplication() throws Throwable { } // Repl load with zero events but external tables location info should present. - tuple = primary.dump(primaryDbName); + tuple = primary.dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -476,7 +491,7 @@ public void externalTableIncrementalReplication() throws Throwable { tuple = primary .run("alter table t1 drop partition (country='india')") .run("alter table t1 drop partition (country='us')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName) .run("select * From t1") @@ -521,7 +536,10 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .verifyReplTargetProperty(replicatedDbName); dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + REPLICA_EXTERNAL_BASE + "'"); + tuple = primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t3 (id int)") @@ -615,7 +633,8 @@ public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable { .verifyReplTargetProperty(replicatedDbName); dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'"); WarehouseInstance.Tuple tupleIncWithExternalBootstrap = primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t4 (id int)") @@ -713,9 +732,11 @@ public void testExternalTableDataPath() throws Exception { @Test public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); + List dumpWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + REPLICA_EXTERNAL_BASE + "'"); + List loadWithClause = externalTableBasePathWithClause(); WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -841,7 +862,9 @@ public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Thro // Take a dump with external tables bootstrapped and load it dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''"); WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName) .dump(primaryDbName, dumpWithClause); @@ -858,7 +881,7 @@ public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Thro @Test public void replicationWithTableNameContainsKeywords() throws Throwable { - List loadWithClause = externalTableBasePathWithClause(); + List withClause = externalTableBasePathWithClause(); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -869,9 +892,9 @@ public void replicationWithTableNameContainsKeywords() throws Throwable { .run("insert into table t2_constraints partition(country='india') values ('bangalore')") .run("insert into table t2_constraints partition(country='us') values ('austin')") .run("insert into table t2_constraints partition(country='france') values ('paris')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("repl status " + replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) @@ -890,9 +913,9 @@ public void replicationWithTableNameContainsKeywords() throws Throwable { .run("create table t4_tables (id int)") .run("insert into table t4_tables values (10)") .run("insert into table t4_tables values (20)") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't3_bootstrap'") .verifyResults(new String[] {"t3_bootstrap"}) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java index c260a7d3bc..8b2c556ce6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java @@ -477,7 +477,9 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); + "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''"); tuple = primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t3 (id int)") diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index ad6c002085..714bafe9d8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -866,7 +866,9 @@ public void testRenameTableScenariosExternalTable() throws Throwable { "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", - "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'" + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''" ); replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5"}; bootstrapTables = new String[] {"in2", "in3", "in4", "in5"}; @@ -894,7 +896,9 @@ public void testRenameTableScenariosWithReplaceExternalTable() throws Throwable List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); List dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'" + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''" ); String replPolicy = primaryDbName + ".'(in[0-9]+)|(out4)|(out5)|(out1500)'"; String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause, @@ -918,7 +922,9 @@ public void testRenameTableScenariosWithReplaceExternalTable() throws Throwable String newPolicy = primaryDbName + ".'(in[0-9]+)|(out1500)|(in2)'"; dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'" + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''" ); // in2 should be dropped. @@ -1044,7 +1050,9 @@ public void testRenameTableScenariosUpgrade() throws Throwable { "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", - "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'" + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''" ); replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5", "in6", "in7", "in9"}; @@ -1059,7 +1067,9 @@ public void testRenameTableScenariosUpgrade() throws Throwable { dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'" + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''" ); // Database replication with ACID and EXTERNAL table. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java index 5c5543cfd9..348ba7f4ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java @@ -42,24 +42,22 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; public class ExternalTableCopyTaskBuilder { private static final Logger LOG = LoggerFactory.getLogger(ExternalTableCopyTaskBuilder.class); - private final ReplLoadWork work; + private final ReplDumpWork work; private final HiveConf conf; - ExternalTableCopyTaskBuilder(ReplLoadWork work, HiveConf conf) { + ExternalTableCopyTaskBuilder(ReplDumpWork work, HiveConf conf) { this.work = work; this.conf = conf; } List> tasks(TaskTracker tracker) { List> tasks = new ArrayList<>(); - Iterator itr = work.getPathsToCopyIterator(); - while (tracker.canAddMoreTasks() && itr.hasNext()) { - DirCopyWork dirCopyWork = itr.next(); + while (tracker.canAddMoreTasks() && work.getDirCopyIterator().hasNext()) { + DirCopyWork dirCopyWork = work.getDirCopyIterator().next(); Task task = TaskFactory.get(dirCopyWork, conf); tasks.add(task); tracker.addTask(task); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index aa59457119..d08d4b6595 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -40,9 +40,12 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -78,14 +81,16 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; +import java.util.Iterator; import java.util.Set; import java.util.HashSet; import java.util.List; import java.util.Arrays; import java.util.Collections; import java.util.Base64; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.UUID; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; @@ -122,25 +127,31 @@ public String getName() { @Override public int execute() { try { - Hive hiveDb = getHive(); - Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), - Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() - .getBytes(StandardCharsets.UTF_8.name()))); - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); - Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); - DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); - // Initialize ReplChangeManager instance since we will require it to encode file URI. - ReplChangeManager.getInstance(conf); - Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); - Long lastReplId; - if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) - || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { - lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + //First Check if external table copy work has been initialized, if so, just do that and return. + if (work.dirCopyIteratorInitialized() || work.replPathIteratorInitialized()) { + intitiateDataCopyTasks(); } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); - lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + Hive hiveDb = getHive(); + Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), + Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() + .getBytes(StandardCharsets.UTF_8.name()))); + Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); + // Initialize ReplChangeManager instance since we will require it to encode file URI. + ReplChangeManager.getInstance(conf); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); + Long lastReplId; + if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) + || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } else { + work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } + intitiateDataCopyTasks(); + prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } - prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { LOG.error("failed", e); setException(e); @@ -358,7 +369,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); - + List extTableLocations = new LinkedList<>(); + List replPathMappings = new ArrayList<>(); try (Writer writer = new Writer(dumpRoot, conf)) { for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { try { @@ -367,13 +379,14 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // Dump external table locations if required. if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) && shouldDumpExternalTableLocation()) { - writer.dataLocationDump(table); + extTableLocations.addAll(writer.dataLocationDump(table)); } // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); + replPathMappings.addAll(dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, + hiveDb, tableTuple)); } if (tableList != null && isTableSatifiesConfig(table)) { tableList.add(tableName); @@ -386,8 +399,10 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); + List extTableCopyWorks = dirLocationsToCopy(extTableLocations); + work.setDirCopyIterator(extTableCopyWorks.iterator()); + work.setReplPathIterator(replPathMappings.iterator()); } - return lastReplId; } @@ -484,6 +499,20 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); } + private List dirLocationsToCopy(List sourceLocations) + throws HiveException { + List list = new ArrayList<>(sourceLocations.size()); + String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); + // this is done to remove any scheme related information that will be present in the base path + // specifically when we are replicating to cloud storage + Path basePath = new Path(baseDir); + for (Path sourcePath : sourceLocations) { + Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath); + list.add(new ExternalTableCopyTaskBuilder.DirCopyWork(sourcePath, targetPath)); + } + return list; + } + Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { // bootstrap case @@ -500,6 +529,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) long waitUntilTime = System.currentTimeMillis() + timeoutInMs; String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); + List extTableCopyWorks = new ArrayList<>(); + List replPathMappings = new ArrayList<>(); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("Dumping db: " + dbName); @@ -522,6 +553,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; try (Writer writer = new Writer(dbRoot, conf)) { + List extTableLocations = new LinkedList<>(); for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); Table table = null; @@ -532,10 +564,10 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) if (shouldDumpExternalTableLocation() && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { LOG.debug("Adding table {} to external tables list", tblName); - writer.dataLocationDump(tableTuple.object); + extTableLocations.addAll(writer.dataLocationDump(tableTuple.object)); } - dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, - tableTuple); + replPathMappings.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, + hiveDb, tableTuple)); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. @@ -547,6 +579,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); + extTableCopyWorks = dirLocationsToCopy(extTableLocations); } catch (Exception e) { caught = e; } finally { @@ -572,9 +605,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); - - // Set the correct last repl id to return to the user - // Currently returned bootDumpBeginReplId as we don't consolidate the events after bootstrap + work.setDirCopyIterator(extTableCopyWorks.iterator()); + work.setReplPathIterator(replPathMappings.iterator()); return bootDumpBeginReplId; } @@ -592,8 +624,8 @@ Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) return dbRoot; } - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, long lastReplId, - Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { + List dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, + long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = @@ -616,15 +648,28 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE) || Utils.shouldDumpMetaDataOnly(conf)) { - return; + return Collections.EMPTY_LIST; } - for (ReplPathMapping replPathMapping: replPathMappings) { - Task copyTask = ReplCopyTask.getLoadCopyTask( - tuple.replicationSpec, replPathMapping.getSrcPath(), replPathMapping.getTargetPath(), conf, false); - this.addDependentTask(copyTask); - LOG.info("Scheduled a repl copy task from [{}] to [{}]", - replPathMapping.getSrcPath(), replPathMapping.getTargetPath()); + return replPathMappings; + } + + private void intitiateDataCopyTasks() { + Iterator extCopyWorkItr = work.getDirCopyIterator(); + Iterator mangedTablCopyPathItr = work.getReplPathIterator(); + List> childTasks = new ArrayList<>(); + int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); + TaskTracker taskTracker = new TaskTracker(maxTasks); + while (taskTracker.canAddMoreTasks() + && ((work.dirCopyIteratorInitialized() && extCopyWorkItr.hasNext()) + || (work.replPathIteratorInitialized() && mangedTablCopyPathItr.hasNext()))) { + if (extCopyWorkItr.hasNext()) { + childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(taskTracker)); + } else { + childTasks.addAll(ReplPathMapping.tasks(work, taskTracker, conf)); + } } + DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); + this.childTasks = childTasks; } private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 9b11bae6ba..03964cdcfa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -20,10 +20,12 @@ import com.google.common.primitives.Ints; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.Iterator; @Explain(displayName = "Replication Dump Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, @@ -36,6 +38,8 @@ Long eventFrom; static String testInjectDumpDir = null; private Integer maxEventLimit; + private transient Iterator dirCopyIterator; + private transient Iterator replPathIterator; public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; @@ -87,4 +91,31 @@ void overrideLastEventToDump(Hive fromDb, long bootstrapLastId) throws Exception .debug("eventTo not specified, using current event id : {}", eventTo); } } + + public Iterator getDirCopyIterator() { + return dirCopyIterator; + } + + public void setDirCopyIterator(Iterator dirCopyIterator) { + if (dirCopyIteratorInitialized()) { + throw new IllegalStateException("Dir Copy iterator has already been initialized"); + } + this.dirCopyIterator = dirCopyIterator; + } + + public boolean dirCopyIteratorInitialized() { + return dirCopyIterator != null; + } + + public Iterator getReplPathIterator() { + return replPathIterator; + } + + public void setReplPathIterator(Iterator replPathIterator) { + this.replPathIterator = replPathIterator; + } + + public boolean replPathIteratorInitialized() { + return replPathIterator != null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index c7aa0077a6..fddee28790 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -43,6 +43,7 @@ import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -120,11 +121,12 @@ private boolean shouldWrite() { /** * this will dump a single line per external table. it can include additional lines for the same * table if the table is partitioned and the partition location is outside the table. + * It returns list of all the external table locations. */ - void dataLocationDump(Table table) - throws InterruptedException, IOException, HiveException { + List dataLocationDump(Table table) throws InterruptedException, IOException, HiveException { + List extTableLocations = new LinkedList<>(); if (!shouldWrite()) { - return; + return extTableLocations; } if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { throw new IllegalArgumentException( @@ -134,6 +136,7 @@ void dataLocationDump(Table table) Path fullyQualifiedDataLocation = PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); + extTableLocations.add(fullyQualifiedDataLocation); if (table.isPartitioned()) { List partitions; try { @@ -142,7 +145,7 @@ void dataLocationDump(Table table) if (e.getCause() instanceof NoSuchObjectException) { // If table is dropped when dump in progress, just skip partitions data location dump LOG.debug(e.getMessage()); - return; + return extTableLocations; } throw e; } @@ -155,9 +158,11 @@ void dataLocationDump(Table table) fullyQualifiedDataLocation = PathBuilder .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); + extTableLocations.add(fullyQualifiedDataLocation); } } } + return extTableLocations; } private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a2c467bafd..ab99b9c606 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -135,16 +135,8 @@ a database ( directory ) if (!iterator.hasNext() && constraintIterator.hasNext()) { loadingConstraint = true; } - while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()) || - (work.getPathsToCopyIterator().hasNext())) && loadTaskTracker.canAddMoreTasks()) { - // First start the distcp tasks to copy the files related to external table. The distcp tasks should be - // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these - // directory with proper permission and owner. - if (work.getPathsToCopyIterator().hasNext()) { - scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker)); - break; - } - + while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) + && loadTaskTracker.canAddMoreTasks()) { BootstrapEvent next; if (!loadingConstraint) { next = iterator.next(); @@ -261,8 +253,7 @@ a database ( directory ) boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() - || constraintIterator.hasNext() - || work.getPathsToCopyIterator().hasNext(); + || constraintIterator.hasNext(); if (addAnotherLoadTask) { createBuilderTask(scope.rootTasks); @@ -271,8 +262,7 @@ a database ( directory ) // Update last repl ID of the database only if the current dump is not incremental. If bootstrap // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change // last repl ID of the database. - if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.getPathsToCopyIterator().hasNext() - && !work.isIncrementalLoad()) { + if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope)); work.updateDbEventState(null); } @@ -498,7 +488,7 @@ private int executeIncrementalLoad() { IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); // If incremental events are already applied, then check and perform if need to bootstrap any tables. - if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { + if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) { if (work.hasBootstrapLoadTasks()) { LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " + "mode after applying all events."); @@ -509,20 +499,13 @@ private int executeIncrementalLoad() { List> childTasks = new ArrayList<>(); int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - // First start the distcp tasks to copy the files related to external table. The distcp tasks should be - // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these - // directory with proper permission and owner. TaskTracker tracker = new TaskTracker(maxTasks); - if (work.getPathsToCopyIterator().hasNext()) { - childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(tracker)); - } else { - childTasks.add(builder.build(context, getHive(), LOG, tracker)); - } + childTasks.add(builder.build(context, getHive(), LOG, tracker)); // If there are no more events to be applied, add a task to update the last.repl.id of the // target database to the event id of the last event considered by the dump. Next // incremental cycle won't consider the events in this dump again if it starts from this id. - if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { + if (!builder.hasMoreWork()) { // The name of the database to be loaded into is either specified directly in REPL LOAD // command i.e. when dbNameToLoadIn has a valid dbname or is available through dump // metadata during table level replication. @@ -551,14 +534,13 @@ private int executeIncrementalLoad() { TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf); DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask)); + work.setLastReplIDUpdated(true); LOG.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid); } } - // Either the incremental has more work or the external table file copy has more paths to process. - // Once all the incremental events are applied and external tables file copies are done, enable - // bootstrap of tables if exist. - if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext() || work.hasBootstrapLoadTasks()) { + // Once all the incremental events are applied, enable bootstrap of tables if exist. + if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) { DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); } this.childTasks = childTasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index f25c71403d..fef302275c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -35,9 +35,6 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.util.Iterator; -import java.util.List; -import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; @Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, @@ -48,6 +45,7 @@ final String dumpDirectory; final String bootstrapDumpToCleanTables; boolean needCleanTablesFromBootstrap; + private boolean lastReplIDUpdated; private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; @@ -55,7 +53,6 @@ private final transient BootstrapEventsIterator bootstrapIterator; private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder; private transient Task rootTask; - private final transient Iterator pathsToCopyIterator; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -66,8 +63,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, ReplScope currentReplScope, - LineageState lineageState, boolean isIncrementalDump, Long eventTo, - List pathsToCopyIterator) throws IOException { + LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException { sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; @@ -106,7 +102,6 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); incrementalLoadTasksBuilder = null; } - this.pathsToCopyIterator = pathsToCopyIterator.iterator(); } BootstrapEventsIterator bootstrapIterator() { @@ -154,7 +149,11 @@ public void setRootTask(Task rootTask) { this.rootTask = rootTask; } - public Iterator getPathsToCopyIterator() { - return pathsToCopyIterator; + public boolean isLastReplIDUpdated() { + return lastReplIDUpdated; + } + + public void setLastReplIDUpdated(boolean lastReplIDUpdated) { + this.lastReplIDUpdated = lastReplIDUpdated; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index e65cbf50e6..761366cae7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -30,8 +30,11 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -51,10 +54,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -161,10 +164,12 @@ public void setOpenTxnTask(Task openTxnTask) { * Wrapper class for mapping replication source and target path for copying data. */ public static class ReplPathMapping { + private ReplicationSpec replicationSpec; private Path srcPath; private Path tgtPath; - public ReplPathMapping(Path srcPath, Path tgtPath) { + public ReplPathMapping(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) { + this.replicationSpec = replicationSpec; if (srcPath == null) { throw new IllegalArgumentException("Source Path can not be null."); } @@ -190,6 +195,36 @@ public Path getTargetPath() { public void setTargetPath(Path targetPath) { this.tgtPath = targetPath; } + + @Override + public String toString() { + return "ReplPathMapping{" + + "fullyQualifiedSourcePath=" + srcPath + + ", fullyQualifiedTargetPath=" + tgtPath + + '}'; + } + + public static List> tasks(ReplDumpWork work, TaskTracker tracker, HiveConf conf) { + List> tasks = new ArrayList<>(); + while (tracker.canAddMoreTasks() && work.getReplPathIterator().hasNext()) { + ReplPathMapping replPathMapping = work.getReplPathIterator().next(); + Task copyTask = ReplCopyTask.getLoadCopyTask( + replPathMapping.replicationSpec, replPathMapping.getSrcPath(), replPathMapping.getTargetPath(), conf, + false); + tasks.add(copyTask); + tracker.addTask(copyTask); + LOG.debug("added task for {}", replPathMapping); + } + return tasks; + } + + public ReplicationSpec getReplicationSpec() { + return replicationSpec; + } + + public void setReplicationSpec(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } } private EximUtil() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c2e9f883ce..2db281b81c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -403,8 +403,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(), dmd.getReplScope(), - queryState.getLineageState(), evDump, dmd.getEventTo(), - dirLocationsToCopy(loadPath, evDump)); + queryState.getLineageState(), evDump, dmd.getEventTo()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 454998f02c..10406ff40c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -122,7 +122,8 @@ new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) .export(isExportTask); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); - return new ReplPathMapping(partition.getDataLocation(), new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME)); + return new ReplPathMapping(forReplicationSpec, partition.getDataLocation(), + new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME)); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index a26b15948c..d3d2b762aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -175,7 +175,8 @@ private void writeMetaData(PartitionIterable partitions) throws SemanticExceptio } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); - replCopyPathMappings.add(new ReplPathMapping(tableSpec.tableHandle.getDataLocation(), paths.dataExportDir())); + replCopyPathMappings.add(new ReplPathMapping(replicationSpec, tableSpec.tableHandle.getDataLocation(), + paths.dataExportDir())); new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) .export(isExportTask); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 2651ea49e1..9f6f723fb0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.junit.Test; @@ -127,13 +128,15 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw private int tableDumpCount = 0; @Override - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path replDataDir, - long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) + List dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, + Path replDataDir, long lastReplId, Hive hiveDb, + HiveWrapper.Tuple
tuple) throws Exception { tableDumpCount++; if (tableDumpCount > 1) { throw new TestException(); } + return Collections.EMPTY_LIST; } };