diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java index 78f4cbeaf5..3d8e3984c6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -44,6 +46,8 @@ public static void classLevelSetup() throws Exception { "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); overrides.put(MetastoreConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.getVarname(), "true"); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); internalBeforeClassSetup(overrides, TestReplicationScenarios.class); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 6002d7349e..84699ce192 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils; +import org.apache.hadoop.hive.ql.parse.WarehouseInstance.Tuple; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; @@ -67,6 +69,7 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.TARGET_OF_REPLICATION; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -1671,6 +1674,33 @@ public void testHdfsNameserviceLazyCopy() throws Throwable { } } + @Test + public void testHdfsNSLazyCopyBootStrapExtTbls() throws Throwable { + List clause = getHdfsNameserviceClause(); + clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); + Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table ext_table1 (id int)") + .run("insert into ext_table1 values (3)") + .run("insert into ext_table1 values (4)") + .run("create external table ext_table2 (key int, value int) partitioned by (load_time timestamp)") + .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.123') values(1,2)") + .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.124') values(1,3)") + .run("show partitions ext_table2") + .verifyResults(new String[] { + "load_time=2012-02-21 07%3A08%3A09.123", + "load_time=2012-02-21 07%3A08%3A09.124"}) + .dump(primaryDbName, clause); + + assertExternalFileInfo(Arrays.asList("ext_table1", "ext_table2"), tuple.dumpLocation, false, primary); + //SecurityException expected from DirCopyTask + try{ + replica.load(replicatedDbName, primaryDbName, clause); + Assert.fail("Expected the UnknownHostException to be thrown."); + } catch (SecurityException ex) { + assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote")); + } + } + @Test public void testHdfsNameserviceLazyCopyIncr() throws Throwable { List clause = getHdfsNameserviceClause(); @@ -1710,6 +1740,49 @@ public void testHdfsNameserviceLazyCopyIncr() throws Throwable { } } + @Test + public void testHdfsNSLazyCopyIncrExtTbls() throws Throwable { + List clause = getHdfsNameserviceClause(); + clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); + + primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_time timestamp) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i String)") + .run("insert into table1 values (1)") + .run("insert into table1 values (2)") + .dump(primaryDbName); + + replica.load(replicatedDbName, primaryDbName, clause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"acid_table", "table1"}) + .run("select * from table1") + .verifyResults(new String[] {"1", "2"}); + + Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table ext_table1 (id int)") + .run("insert into ext_table1 values (3)") + .run("insert into ext_table1 values (4)") + .run("create external table ext_table2 (key int, value int) partitioned by (load_time timestamp)") + .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.123') values(1,2)") + .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.124') values(1,3)") + .run("show partitions ext_table2") + .verifyResults(new String[] { + "load_time=2012-02-21 07%3A08%3A09.123", + "load_time=2012-02-21 07%3A08%3A09.124"}) + .dump(primaryDbName, clause); + + assertExternalFileInfo(Arrays.asList("ext_table1", "ext_table2"), tuple.dumpLocation, true, primary); + //SecurityException expected from DirCopyTask + try{ + replica.load(replicatedDbName, primaryDbName, clause); + Assert.fail("Expected the UnknownHostException to be thrown."); + } catch (SecurityException ex) { + assertTrue(ex.getMessage().contains("java.net.UnknownHostException: nsRemote")); + } + } + @Test public void testHdfsNameserviceWithDataCopy() throws Throwable { List clause = getHdfsNameserviceClause(); @@ -1735,9 +1808,32 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable { replica.load(replicatedDbName, primaryDbName, clause) .run("use " + replicatedDbName) .run("show tables") - .verifyResults(new String[] {"acid_table", "table1"}) + .verifyResults(new String[]{"acid_table", "table1"}) .run("select * from table1") - .verifyResults(new String[] {"1", "2", "3"}); + .verifyResults(new String[]{"1", "2", "3"}); + + clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); + primary.run("use " + primaryDbName) + .run("create external table ext_table1 (id int)") + .run("insert into ext_table1 values (3)") + .run("insert into ext_table1 values (4)") + .run("create external table ext_table2 (key int, value int) partitioned by (load_time timestamp)") + .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.123') values(1,2)") + .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.124') values(1,3)") + .run("show partitions ext_table2") + .verifyResults(new String[]{ + "load_time=2012-02-21 07%3A08%3A09.123", + "load_time=2012-02-21 07%3A08%3A09.124"}) + .dump(primaryDbName, clause); + + replica.load(replicatedDbName, primaryDbName, clause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"acid_table", "table1", "ext_table1", "ext_table2"}) + .run("select * from ext_table1") + .verifyResults(new String[]{"3", "4"}) + .run("select value from ext_table2") + .verifyResults(new String[]{"2", "3"}); } @Test @@ -2125,4 +2221,21 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa + NS_REMOTE + "'"); return withClause; } + + /* + * Method used from TestReplicationScenariosExclusiveReplica + */ + private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental, + WarehouseInstance warehouseInstance) + throws IOException { + Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); + Path externalTableInfoFile; + if (isIncremental) { + externalTableInfoFile = new Path(hivePath, FILE_NAME); + } else { + externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); + } + ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index 796cfaec17..1f4e20de98 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,6 +184,22 @@ private void dirLocationToCopy(FileList fileList, Path sourcePath, HiveConf conf throws HiveException { Path basePath = getExternalTableBaseDir(conf); Path targetPath = externalTableDataPath(conf, basePath, sourcePath); + //Here, when src and target are HA clusters with same NS, then sourcePath would have the correct host + //whereas the targetPath would have an host that refers to the target cluster. This is fine for + //data-copy running during dump as the correct logical locations would be used. But if data-copy runs during + //load, then the remote location needs to point to the src cluster from where the data would be copied and + //the common original NS would suffice for targetPath. + if(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE) && + hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET)) { + String remoteNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname); + if (StringUtils.isEmpty(remoteNS)) { + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE + .format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid " + + remoteNS == null ? "null" : remoteNS, ReplUtils.REPL_HIVE_SERVICE)); + } + targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost())); + sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS)); + } fileList.add(new DirCopyWork(sourcePath, targetPath).convertToString()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 71b6041a2d..4ca65cd625 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -94,7 +94,7 @@ public static String replaceNameserviceInEncodedURI(String cmEncodedURI, HiveCon return modifiedURI; } - private static String replaceHost(String originalURIStr, String newHost) throws SemanticException { + public static String replaceHost(String originalURIStr, String newHost) throws SemanticException { if (StringUtils.isEmpty(originalURIStr)) { return originalURIStr; }