diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a4533c4a67..1c06ca9dd5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -526,8 +526,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n " + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n" + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), - REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", "/", - "This is the base directory on the target/replica warehouse under which data for " + REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", null, + "This is the fully qualified base directory on the target/replica warehouse under which data for " + "external tables is stored. This is relative base path and hence prefixed to the source " + "external table path on target cluster."), REPL_INCLUDE_AUTHORIZATION_METADATA("hive.repl.include.authorization.metadata", false, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java index 6b96d2b800..0fa1a81902 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -46,6 +48,8 @@ String primaryDbName, replicatedDbName; static HiveConf conf; // for primary static HiveConf replicaConf; + protected static final Path REPLICA_EXTERNAL_BASE = new Path("/replica_external_base"); + protected static String fullyQualifiedReplicaExternalBase; static void internalBeforeClassSetup(Map overrides, Class clazz) throws Exception { @@ -59,6 +63,7 @@ static void internalBeforeClassSetup(Map overrides, Class clazz) put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); }}; localOverrides.putAll(overrides); + setReplicaExternalBase(miniDFSCluster.getFileSystem(), localOverrides); primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); localOverrides.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir); replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); @@ -68,33 +73,39 @@ static void internalBeforeClassSetup(Map overrides, Class clazz) static void internalBeforeClassSetupExclusiveReplica(Map primaryOverrides, Map replicaOverrides, Class clazz) throws Exception { - conf = new HiveConf(clazz); - conf.set("dfs.client.use.datanode.hostname", "true"); - conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - String primaryBaseDir = Files.createTempDirectory("base").toFile().getAbsolutePath(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir); - MiniDFSCluster miniPrimaryDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - Map localOverrides = new HashMap() { - { - put("fs.defaultFS", miniPrimaryDFSCluster.getFileSystem().getUri().toString()); - put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); - } - }; - localOverrides.putAll(primaryOverrides); - primary = new WarehouseInstance(LOG, miniPrimaryDFSCluster, localOverrides); + /** + * Setup replica cluster + */ String replicaBaseDir = Files.createTempDirectory("replica").toFile().getAbsolutePath(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir); replicaConf = new HiveConf(clazz); replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir); replicaConf.set("dfs.client.use.datanode.hostname", "true"); replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniReplicaDFSCluster = new MiniDFSCluster.Builder(replicaConf).numDataNodes(1).format(true).build(); - localOverrides.clear(); - localOverrides.putAll(replicaOverrides); + + Map localOverrides = new HashMap<>(); localOverrides.put("fs.defaultFS", miniReplicaDFSCluster.getFileSystem().getUri().toString()); localOverrides.put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); + localOverrides.putAll(replicaOverrides); + setReplicaExternalBase(miniReplicaDFSCluster.getFileSystem(), localOverrides); replica = new WarehouseInstance(LOG, miniReplicaDFSCluster, localOverrides); + + /** + * Setup primary cluster + */ + String primaryBaseDir = Files.createTempDirectory("base").toFile().getAbsolutePath(); + conf = new HiveConf(clazz); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniPrimaryDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + localOverrides.clear(); + localOverrides.put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); + localOverrides.put(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname, fullyQualifiedReplicaExternalBase); + localOverrides.put("fs.defaultFS", miniPrimaryDFSCluster.getFileSystem().getUri().toString()); + localOverrides.putAll(primaryOverrides); + primary = new WarehouseInstance(LOG, miniPrimaryDFSCluster, localOverrides); } @AfterClass @@ -103,6 +114,12 @@ public static void classLevelTearDown() throws IOException { replica.close(); } + private static void setReplicaExternalBase(FileSystem fs, Map confMap) throws IOException { + fs.mkdirs(REPLICA_EXTERNAL_BASE); + fullyQualifiedReplicaExternalBase = fs.getFileStatus(REPLICA_EXTERNAL_BASE).getPath().toString(); + confMap.put(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname, fullyQualifiedReplicaExternalBase); + } + @Before public void setup() throws Throwable { primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java index 2284fcad5d..521a2ef56d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; @@ -58,6 +60,8 @@ public final TestName testName = new TestName(); protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); + private static final Path REPLICA_EXTERNAL_BASE = new Path("/replica_external_base"); + protected static String fullyQualifiedReplicaExternalBase; static WarehouseInstance primary; static WarehouseInstance replica, replicaNonAcid; static HiveConf conf; @@ -88,6 +92,7 @@ static void internalBeforeClassSetup(Map overrides, Class clazz) acidEnableConf.putAll(overrides); + setReplicaExternalBase(miniDFSCluster.getFileSystem(), acidEnableConf); primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir); replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); @@ -101,6 +106,12 @@ static void internalBeforeClassSetup(Map overrides, Class clazz) replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); } + private static void setReplicaExternalBase(FileSystem fs, Map confMap) throws IOException { + fs.mkdirs(REPLICA_EXTERNAL_BASE); + fullyQualifiedReplicaExternalBase = fs.getFileStatus(REPLICA_EXTERNAL_BASE).getPath().toString(); + confMap.put(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname, fullyQualifiedReplicaExternalBase); + } + @AfterClass public static void classLevelTearDown() throws IOException { primary.close(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java index 902c731be8..8a44443951 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java @@ -502,19 +502,11 @@ public static void insertForMerge(WarehouseInstance primary, String primaryDbNam "creation", "creation", "merge_update", "merge_insert", "merge_insert"}); } - public static List externalTableBasePathWithClause(String replExternalBase, WarehouseInstance replica) - throws IOException, SemanticException { - Path externalTableLocation = new Path(replExternalBase); - DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); - externalTableLocation = PathBuilder.fullyQualifiedHDFSUri(externalTableLocation, fileSystem); - fileSystem.mkdirs(externalTableLocation); - - // this is required since the same filesystem is used in both source and target - return Arrays.asList( - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + externalTableLocation.toString() + "'", - "'distcp.options.pugpb'=''" - ); + public static List externalTableClause(boolean enable) { + List withClause = new ArrayList<>(); + withClause.add("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='" + enable + "'"); + withClause.add("'distcp.options.pugpb'=''"); + return withClause; } public static List externalTableWithClause(List externalTableBasePathWithClause, Boolean bootstrap, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index 371f90bb1f..7e6b5e29ab 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -43,8 +43,6 @@ */ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcrossInstances { - private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; - @BeforeClass public static void classLevelSetup() throws Exception { Map overrides = new HashMap<>(); @@ -70,7 +68,6 @@ public void tearDown() throws Throwable { @Test public void externalTableReplicationWithRemoteStaging() throws Throwable { List withClauseOptions = getStagingLocationConfig(replica.repldDir); - withClauseOptions.addAll(externalTableBasePathWithClause()); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -126,7 +123,6 @@ public void externalTableReplicationWithRemoteStaging() throws Throwable { @Test public void externalTableReplicationWithLocalStaging() throws Throwable { List withClauseOptions = getStagingLocationConfig(primary.repldDir); - withClauseOptions.addAll(externalTableBasePathWithClause()); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -185,10 +181,6 @@ public void externalTableReplicationWithLocalStaging() throws Throwable { return confList; } - private List externalTableBasePathWithClause() throws IOException, SemanticException { - return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - } - private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental, WarehouseInstance warehouseInstance) throws IOException { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index b078ea1c58..bd85225ba0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -66,7 +66,6 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcrossInstances { - private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; String extraPrimaryDb; @BeforeClass @@ -96,10 +95,7 @@ public void tearDown() throws Throwable { @Test public void replicationWithoutExternalTables() throws Throwable { - List loadWithClause = externalTableBasePathWithClause(); - List dumpWithClause = Collections.singletonList - ("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); - + List withClause = ReplicationTestUtils.externalTableClause(false); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -109,7 +105,7 @@ public void replicationWithoutExternalTables() throws Throwable { .run("insert into table t2 partition(country='india') values ('bangalore')") .run("insert into table t2 partition(country='us') values ('austin')") .run("insert into table t2 partition(country='france') values ('paris')") - .dump(primaryDbName, dumpWithClause); + .dump(primaryDbName, withClause); // the _external_tables_file info only should be created if external tables are to be replicated not otherwise Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), @@ -117,7 +113,7 @@ public void replicationWithoutExternalTables() throws Throwable { assertFalse(primary.miniDFSCluster.getFileSystem() .exists(new Path(metadataPath + relativeExtInfoPath(primaryDbName)))); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("repl status " + replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) @@ -131,7 +127,7 @@ public void replicationWithoutExternalTables() throws Throwable { .run("create external table t3 (id int)") .run("insert into table t3 values (10)") .run("insert into table t3 values (20)") - .dump(primaryDbName, dumpWithClause); + .dump(primaryDbName, withClause); // the _external_tables_file info only should be created if external tables are to be replicated not otherwise metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), @@ -139,7 +135,7 @@ public void replicationWithoutExternalTables() throws Throwable { assertFalse(primary.miniDFSCluster.getFileSystem() .exists(new Path(metadataPath + relativeExtInfoPath(null)))); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't3'") .verifyFailure(new String[] { "t3" }) @@ -148,7 +144,6 @@ public void replicationWithoutExternalTables() throws Throwable { @Test public void externalTableReplicationWithDefaultPaths() throws Throwable { - List withClauseOptions = externalTableBasePathWithClause(); //creates external tables with partitions WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -159,14 +154,14 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("insert into table t2 partition(country='india') values ('bangalore')") .run("insert into table t2 partition(country='us') values ('austin')") .run("insert into table t2 partition(country='france') values ('paris')") - .dump(primaryDbName, withClauseOptions); + .dump(primaryDbName); // verify that the external table info is written correctly for bootstrap assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false); - replica.load(replicatedDbName, primaryDbName, withClauseOptions) + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -191,12 +186,12 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("create external table t3 (id int)") .run("insert into table t3 values (10)") .run("create external table t4 as select id from t3") - .dump(primaryDbName, withClauseOptions); + .dump(primaryDbName); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true); - replica.load(replicatedDbName, primaryDbName, withClauseOptions) + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't3'") .verifyResult("t3") @@ -209,7 +204,7 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { tuple = primary.run("use " + primaryDbName) .run("drop table t1") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true); @@ -254,10 +249,7 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { // Create base directory but use HDFS path without schema or authority details. // Hive should pick up the local cluster's HDFS schema/authority. - externalTableBasePathWithClause(); List withClause = Arrays.asList( - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.update'=''" ); @@ -311,7 +303,7 @@ public void externalTableWithPartitions() throws Throwable { DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); fs.mkdirs(externalTableLocation, new FsPermission("777")); - List withClause = externalTableBasePathWithClause(); + List withClause = ReplicationTestUtils.externalTableClause(true); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t2 (place string) partitioned by (country string) row format " @@ -414,7 +406,7 @@ public void externalTableWithPartitions() throws Throwable { @Test public void externalTableIncrementalCheckpointing() throws Throwable { - List withClause = externalTableBasePathWithClause(); + List withClause = ReplicationTestUtils.externalTableClause(true); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -439,7 +431,7 @@ public void externalTableIncrementalCheckpointing() throws Throwable { ReplDumpWork.testDeletePreviousDumpMetaPath(true); - withClause = externalTableWithClause(true, true); + withClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true); WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) .run("drop table t1") .run("insert into table t2 values (5)") @@ -503,8 +495,7 @@ public void externalTableIncrementalCheckpointing() throws Throwable { @Test public void externalTableIncrementalReplication() throws Throwable { - List withClause = externalTableBasePathWithClause(); - WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, withClause); + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName); replica.load(replicatedDbName, primaryDbName); Path externalTableLocation = new Path("/" + testName.getMethodName() + "/t1/"); @@ -517,7 +508,7 @@ public void externalTableIncrementalReplication() throws Throwable { + "'") .run("alter table t1 add partition(country='india')") .run("alter table t1 add partition(country='us')") - .dump(primaryDbName, withClause); + .dump(primaryDbName); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true); @@ -533,8 +524,7 @@ public void externalTableIncrementalReplication() throws Throwable { outputStream.write("bangalore\n".getBytes()); } - List loadWithClause = externalTableBasePathWithClause(); - replica.load(replicatedDbName, primaryDbName, withClause) + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -546,8 +536,8 @@ public void externalTableIncrementalReplication() throws Throwable { // The Data should be seen after next dump-and-load cycle. tuple = primary.run("use " + primaryDbName) - .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, withClause) + .dump(primaryDbName); + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -565,10 +555,10 @@ public void externalTableIncrementalReplication() throws Throwable { } // Repl load with zero events but external tables location info should present. - tuple = primary.dump(primaryDbName, withClause); + tuple = primary.dump(primaryDbName); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true); - replica.load(replicatedDbName, primaryDbName, withClause) + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -587,7 +577,7 @@ public void externalTableIncrementalReplication() throws Throwable { tuple = primary .run("alter table t1 drop partition (country='india')") .run("alter table t1 drop partition (country='us')") - .dump(primaryDbName, withClause); + .dump(primaryDbName); replica.load(replicatedDbName, primaryDbName) .run("select * From t1") @@ -601,7 +591,6 @@ public void externalTableIncrementalReplication() throws Throwable { @Test public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { - List loadWithClause = externalTableBasePathWithClause(); List dumpWithClause = Collections.singletonList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" ); @@ -623,7 +612,7 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { assertFalse(primary.miniDFSCluster.getFileSystem() .exists(new Path(metadataPath + relativeExtInfoPath(null)))); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) @@ -633,7 +622,7 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .verifyFailure(new String[] {"t2" }) .verifyReplTargetProperty(replicatedDbName); - dumpWithClause = externalTableWithClause(true, true); + dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true); tuple = primary.run("use " + primaryDbName) .run("drop table t1") @@ -664,7 +653,7 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { tblPath = new Path(dbPath, "t3"); assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) @@ -701,12 +690,8 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { @Test public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable { - List loadWithClause = new ArrayList<>(); - loadWithClause.addAll(externalTableBasePathWithClause()); - - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" - ); + List loadWithClause = ReplicationTestUtils.externalTableClause(true); + List dumpWithClause = ReplicationTestUtils.externalTableClause(false); WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary .run("use " + primaryDbName) @@ -728,7 +713,7 @@ public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable { .verifyResult("1") .verifyReplTargetProperty(replicatedDbName); - dumpWithClause = externalTableWithClause(true, true); + dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true); primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t4 (id int)") @@ -777,7 +762,7 @@ public Boolean apply(@Nullable CallerArguments args) { // Insert into existing external table and then Drop it, add another managed table with same name // and dump another bootstrap dump for external tables. - dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'"); + dumpWithClause = ReplicationTestUtils.externalTableClause(true); primary.run("use " + primaryDbName) .run("insert into table t2 partition(country='india') values ('chennai')") .run("drop table t2") @@ -823,8 +808,8 @@ public void testExternalTableDataPath() throws Exception { @Test public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { - List dumpWithClause = this.externalTableWithClause(null, true); - List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), null, true); + List loadWithClause = ReplicationTestUtils.externalTableClause(true); WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") .run("insert into table t1 values (1)") @@ -878,10 +863,8 @@ public Table apply(@Nullable Table table) { @Test public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { - List loadWithClause = externalTableBasePathWithClause(); - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); + List loadWithClause = ReplicationTestUtils.externalTableClause(true); + List dumpWithClause = ReplicationTestUtils.externalTableClause(true); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") .run("insert into table t1 values (1)") @@ -917,7 +900,7 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { @Test public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Throwable { - List loadWithClause = externalTableBasePathWithClause(); + List loadWithClause = ReplicationTestUtils.externalTableClause(false); List dumpWithClause = Collections.singletonList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" ); @@ -942,13 +925,13 @@ public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Thro // This looks like an empty dump but it has the ALTER TABLE event created by the previous // dump. We need it here so that the next dump won't have any events. - WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName); + WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, ReplicationTestUtils.externalTableClause(true)); replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) .verifyResult(incTuple.lastReplicationId); // Take a dump with external tables bootstrapped and load it - dumpWithClause = externalTableWithClause(true, true); + dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true); WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName) .dump(primaryDbName, dumpWithClause); @@ -965,7 +948,7 @@ public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Thro @Test public void replicationWithTableNameContainsKeywords() throws Throwable { - List withClause = externalTableBasePathWithClause(); + List withClause = ReplicationTestUtils.externalTableClause(true); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -1008,15 +991,48 @@ public void replicationWithTableNameContainsKeywords() throws Throwable { .verifyReplTargetProperty(replicatedDbName); } + @Test + public void testExternalTableBaseDirMandatory() throws Throwable { + List withClause = ReplicationTestUtils.externalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='/extTablebase'"); + WarehouseInstance.Tuple tuple = null; + try { + primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values(1)") + .dump(primaryDbName, withClause); + } catch (SemanticException ex) { + assertTrue(ex.getMessage().contains( + "Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required")); + } + withClause = ReplicationTestUtils.externalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + + "'='"+ fullyQualifiedReplicaExternalBase +"'"); + tuple = primary.run("use " + primaryDbName) + .dump(primaryDbName, withClause); + + withClause = ReplicationTestUtils.externalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'=''"); + try { + replica.load(replicatedDbName, primaryDbName, withClause); + } catch (SemanticException ex) { + assertEquals(ex.getMessage(), "Config 'hive.repl.replica.external.table.base.dir' is required"); + } + + withClause = ReplicationTestUtils.externalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + + "'='"+ fullyQualifiedReplicaExternalBase +"'"); - private List externalTableBasePathWithClause() throws IOException, SemanticException { - return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - } + replica.load(replicatedDbName, primaryDbName, withClause); - private List externalTableWithClause(Boolean bootstrapExtTbl, Boolean includeExtTbl) - throws IOException, SemanticException { - List extTblBaseDir = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - return ReplicationTestUtils.externalTableWithClause(extTblBaseDir, bootstrapExtTbl, includeExtTbl); + replica.run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResults(new String[] {"t1"}) + .run("select id from t1") + .verifyResults(new String[] {"1"}) + .verifyReplTargetProperty(replicatedDbName); } private void assertExternalFileInfo(List expected, String dumplocation, @@ -1044,4 +1060,5 @@ private String relativeExtInfoPath(String dbName) { return File.separator + dbName.toLowerCase() + File.separator + FILE_NAME; } } + } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java index e3e66612ec..1a91f9f0b6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java @@ -60,7 +60,6 @@ */ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseReplicationAcrossInstances { - private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; String extraPrimaryDb; @BeforeClass @@ -91,9 +90,8 @@ public void tearDown() throws Throwable { @Test public void replicationWithoutExternalTables() throws Throwable { - List loadWithClause = externalTableBasePathWithClause(); - List dumpWithClause = Arrays.asList("'" - + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + List loadWithClause = ReplicationTestUtils.externalTableClause(false); + List dumpWithClause = ReplicationTestUtils.externalTableClause(false); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -153,7 +151,7 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { // verify that the external table info is not written as metadata only replication assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); - List withClauseOptions = externalTableBasePathWithClause(); + List withClauseOptions = ReplicationTestUtils.externalTableClause(true); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -208,10 +206,7 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { // Create base directory but use HDFS path without schema or authority details. // Hive should pick up the local cluster's HDFS schema/authority. - externalTableBasePathWithClause(); List loadWithClause = Arrays.asList( - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.update'=''" ); @@ -263,7 +258,7 @@ public void externalTableWithPartitions() throws Throwable { DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); fs.mkdirs(externalTableLocation, new FsPermission("777")); - List loadWithClause = externalTableBasePathWithClause(); + List loadWithClause = ReplicationTestUtils.externalTableClause(true); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t2 (place string) partitioned by (country string) row format " @@ -391,7 +386,7 @@ public void externalTableIncrementalReplication() throws Throwable { outputStream.write("bangalore\n".getBytes()); } - List loadWithClause = externalTableBasePathWithClause(); + List loadWithClause = ReplicationTestUtils.externalTableClause(true); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) .run("show tables like 't1'") @@ -446,9 +441,8 @@ public void externalTableIncrementalReplication() throws Throwable { @Test public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { - List loadWithClause = externalTableBasePathWithClause(); - List dumpWithClause - = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + List loadWithClause = ReplicationTestUtils.externalTableClause(false); + List dumpWithClause = ReplicationTestUtils.externalTableClause(false); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -478,8 +472,8 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''"); + loadWithClause = ReplicationTestUtils.externalTableClause(true); tuple = primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t3 (id int)") @@ -548,10 +542,8 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { @Test public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); - List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause = ReplicationTestUtils.externalTableClause(true); + List loadWithClause = ReplicationTestUtils.externalTableClause(true); WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") .run("insert into table t1 values (1)") @@ -606,15 +598,12 @@ public Table apply(@Nullable Table table) { @Test public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { - List loadWithClause = externalTableBasePathWithClause(); - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); + List withClause = ReplicationTestUtils.externalTableClause(true); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") .run("insert into table t1 values (1)") .run("insert into table t1 values (2)") - .dump(primaryDbName, dumpWithClause); + .dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -622,8 +611,8 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { // This looks like an empty dump but it has the ALTER TABLE event created by the previous // dump. We need it here so that the next dump won't have any events. - WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, dumpWithClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause) .status(replicatedDbName) .verifyResult(incTuple.lastReplicationId); @@ -633,20 +622,16 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { WarehouseInstance.Tuple inc2Tuple = primary.run("use " + extraPrimaryDb) .run("create table tbl (fld int)") .run("use " + primaryDbName) - .dump(primaryDbName, dumpWithClause); + .dump(primaryDbName, withClause); Assert.assertEquals(primary.getCurrentNotificationEventId().getEventId(), Long.valueOf(inc2Tuple.lastReplicationId).longValue()); // Incremental load to existing database with empty dump directory should set the repl id to the last event at src. - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .status(replicatedDbName) .verifyResult(inc2Tuple.lastReplicationId); } - private List externalTableBasePathWithClause() throws IOException, SemanticException { - return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - } - private void assertFalseExternalFileInfo(Path externalTableInfoFile) throws IOException { DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java index 642a8dc043..0645cef15e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -64,6 +64,7 @@ */ public class TestReplicationWithTableMigration { private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc"; + private static String fullyQualifiedReplicaExternalBase; @Rule public final TestName testName = new TestName(); @@ -119,6 +120,8 @@ static void internalBeforeClassSetup(Map overrideConfigs) throws primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir); replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs); + fullyQualifiedReplicaExternalBase = miniDFSCluster.getFileSystem().getFileStatus( + new Path("/")).getPath().toString(); } private static Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException { @@ -557,6 +560,7 @@ public void testMigrationWithUpgrade() throws Throwable { withConfigs.add("'hive.repl.bootstrap.acid.tables'='true'"); withConfigs.add("'hive.repl.dump.include.acid.tables'='true'"); withConfigs.add("'hive.repl.include.external.tables'='true'"); + withConfigs.add("'hive.repl.replica.external.table.base.dir' = '" + fullyQualifiedReplicaExternalBase + "'"); withConfigs.add("'hive.distcp.privileged.doAs' = '" + UserGroupInformation.getCurrentUser().getUserName() + "'"); tuple = primary.dump(primaryDbName, withConfigs); replica.load(replicatedDbName, primaryDbName, withConfigs); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java index 017607e87b..ec0c7d6381 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java @@ -183,8 +183,7 @@ public void testAcidTablesReplLoadBootstrapIncr() throws Throwable { @Ignore("HIVE-23395") public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { // Bootstrap - String withClause = " WITH('" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname - + "'='/replica_external_base', '" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + String withClause = " WITH('" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + "' = 'true' ,'" + HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA + "' = 'true' , '" + HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true'" + ",'"+ HiveConf.ConfVars.REPL_ATLAS_ENDPOINT + "' = 'http://localhost:21000/atlas'" + ",'"+ HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB + "' = 'tgt'" diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 5bdf5515e5..3d83ee83fb 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -52,8 +52,6 @@ */ public class TestTableLevelReplicationScenarios extends BaseReplicationScenariosAcidTables { - private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; - @BeforeClass public static void classLevelSetup() throws Exception { Map overrides = new HashMap<>(); @@ -490,10 +488,8 @@ public void testBootstrapExternalTablesWithIncludeAndExcludeList() throws Throwa createTables(originalExternalTables, CreateTableType.EXTERNAL); // Replicate and verify if only 2 tables are replicated to target. - List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); + List loadWithClause = ReplicationTestUtils.externalTableClause(true); + List dumpWithClause = ReplicationTestUtils.externalTableClause(true); String replPolicy = primaryDbName + ".'(a[0-9]+)|(b2)'.'a1'"; String[] replicatedTables = new String[] {"a2", "b2"}; WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) @@ -524,10 +520,9 @@ public void testBootstrapExternalTablesIncrementalPhaseWithIncludeAndExcludeList createTables(originalExternalTables, CreateTableType.EXTERNAL); // Bootstrap should exclude external tables. - List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" - ); + List dumpWithClause = ReplicationTestUtils.externalTableClause(false); + List loadWithClause = ReplicationTestUtils.externalTableClause(false); + String replPolicy = primaryDbName + ".'(a[0-9]+)|(b2)'.'a1'"; String[] bootstrapReplicatedTables = new String[] {"b2"}; String lastReplId = replicateAndVerify(replPolicy, null, @@ -535,10 +530,11 @@ public void testBootstrapExternalTablesIncrementalPhaseWithIncludeAndExcludeList // Enable external tables replication and bootstrap in incremental phase. String[] incrementalReplicatedTables = new String[] {"a2", "b2"}; - dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + dumpWithClause = ReplicationTestUtils.externalTableClause(true); + dumpWithClause.add("'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .dump(replPolicy, dumpWithClause); + loadWithClause = ReplicationTestUtils.externalTableClause(true); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. @@ -661,10 +657,8 @@ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws createTables(originalExternalTables, CreateTableType.EXTERNAL); // Bootstrap should exclude external tables. - List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - List dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" - ); + List loadWithClause = ReplicationTestUtils.externalTableClause(false); + List dumpWithClause = ReplicationTestUtils.externalTableClause(false); String replPolicy = primaryDbName + ".'(a[0-9]+)|(b1)'.'a1'"; String[] bootstrapReplicatedTables = new String[] {"b1"}; String lastReplId = replicateAndVerify(replPolicy, null, @@ -689,6 +683,7 @@ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .dump(replPolicy, oldReplPolicy, dumpWithClause); + loadWithClause = ReplicationTestUtils.externalTableClause(true); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. @@ -835,7 +830,7 @@ public void testRenameTableScenariosAcidTable() throws Throwable { @Test public void testRenameTableScenariosExternalTable() throws Throwable { String replPolicy = primaryDbName + ".'in[0-9]+'.'out[0-9]+'"; - List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List loadWithClause = ReplicationTestUtils.externalTableClause(false); List dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'", @@ -868,9 +863,9 @@ public void testRenameTableScenariosExternalTable() throws Throwable { "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''" ); + loadWithClause = ReplicationTestUtils.externalTableClause(true); replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5"}; bootstrapTables = new String[] {"in2", "in3", "in4", "in5"}; lastReplId = replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, @@ -894,7 +889,7 @@ public void testRenameTableScenariosExternalTable() throws Throwable { @Test public void testRenameTableScenariosWithReplaceExternalTable() throws Throwable { - List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List loadWithClause = ReplicationTestUtils.externalTableClause(true); List dumpWithClause = ReplicationTestUtils.externalTableWithClause(loadWithClause, true, true); String replPolicy = primaryDbName + ".'(in[0-9]+)|(out4)|(out5)|(out1500)'"; String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause, @@ -919,7 +914,6 @@ public void testRenameTableScenariosWithReplaceExternalTable() throws Throwable dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''" ); @@ -1010,7 +1004,7 @@ public void testRenameTableScenariosWithReplacePolicyDMLOperattion() throws Thro public void testRenameTableScenariosUpgrade() throws Throwable { // Policy with no table level filter, no ACID and external table. String replPolicy = primaryDbName; - List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List loadWithClause = ReplicationTestUtils.externalTableClause(false); List dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'", "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'" @@ -1047,7 +1041,6 @@ public void testRenameTableScenariosUpgrade() throws Throwable { "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''" ); @@ -1064,7 +1057,6 @@ public void testRenameTableScenariosUpgrade() throws Throwable { dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''" ); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f21fb7d3dd..68b5ebd262 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.metastore.utils.Retry; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -91,6 +92,7 @@ import java.io.InputStreamReader; import java.io.Serializable; import java.io.UnsupportedEncodingException; +import java.net.URI; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set; @@ -750,10 +752,19 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St private List dirLocationsToCopy(List sourceLocations) throws HiveException { + if (sourceLocations.isEmpty()) { + return Collections.emptyList(); + } List list = new ArrayList<>(sourceLocations.size()); String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); // this is done to remove any scheme related information that will be present in the base path // specifically when we are replicating to cloud storage + URI basePathUri = StringUtils.isEmpty(baseDir) ? null : new Path(baseDir).toUri(); + if (basePathUri == null || basePathUri.getScheme() == null || basePathUri.getAuthority() == null) { + throw new SemanticException( + String.format("Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required %s", + baseDir == null ? "" : "- ('" + baseDir + "')")); + } Path basePath = new Path(baseDir); for (Path sourcePath : sourceLocations) { Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index 0fdd1bf0bb..0f3ce69e90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -66,6 +67,9 @@ private ReplExternalTables(){} public static String externalTableLocation(HiveConf hiveConf, String location) throws SemanticException { String baseDir = hiveConf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); + if (StringUtils.isEmpty(baseDir)) { + throw new SemanticException("Config 'hive.repl.replica.external.table.base.dir' is required"); + } Path basePath = new Path(baseDir); Path currentPath = new Path(location); Path dataLocation = externalTableDataPath(hiveConf, basePath, currentPath);