diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index bf600c2729..eeb6e58b54 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -135,7 +135,13 @@
${project.version}
test
-
+
+ org.apache.hadoop
+ hadoop-distcp
+ ${hadoop.version}
+ test
+
+
org.apache.hive
hive-cli
${project.version}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
new file mode 100644
index 0000000000..3989d37a45
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
@@ -0,0 +1,119 @@
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim;
+
+public class TestCopyUtils {
+ @Rule
+ public final TestName testName = new TestName();
+
+ @Rule
+ public TestRule replV1BackwardCompat;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+
+ static class WarehouseInstanceWithMR extends WarehouseInstance {
+
+ MiniMrShim mrCluster;
+
+ WarehouseInstanceWithMR(Logger logger, MiniDFSCluster cluster,
+ Map overridesForHiveConf) throws Exception {
+ super(logger, cluster, overridesForHiveConf);
+ HadoopShims shims = ShimLoader.getHadoopShims();
+ mrCluster = shims.getLocalMiniTezCluster(hiveConf, false);
+ // mrCluster = shims.getMiniMrCluster(hiveConf, 2,
+ // miniDFSCluster.getFileSystem().getUri().toString(), 1);
+
+ mrCluster.setupConfiguration(hiveConf);
+ }
+
+ @Override
+ public void close() throws IOException {
+ mrCluster.shutdown();
+ super.close();
+ }
+ }
+
+ private static WarehouseInstanceWithMR primary, replica;
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("dfs.client.use.datanode.hostname", "true");
+
+ MiniDFSCluster miniDFSCluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+
+ UserGroupInformation ugi = Utils.getUGI();
+ String currentUser = ugi.getShortUserName();
+
+ HashMap overridesForHiveConf = new HashMap() {{
+ put(ConfVars.HIVE_IN_TEST.varname, "false");
+ put(ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, "1");
+ put(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+ put(ConfVars.HIVE_DISTCP_DOAS_USER.varname, currentUser);
+ }};
+ primary = new WarehouseInstanceWithMR(LOG, miniDFSCluster, overridesForHiveConf);
+ replica = new WarehouseInstanceWithMR(LOG, miniDFSCluster, overridesForHiveConf);
+ }
+
+ @AfterClass
+ public static void classLevelTearDown() throws IOException {
+ primary.close();
+ replica.close();
+ }
+
+ private String primaryDbName, replicatedDbName;
+
+ @Before
+ public void setup() throws Throwable {
+ replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>());
+ primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+ replicatedDbName = "replicated_" + primaryDbName;
+ primary.run("create database " + primaryDbName);
+ }
+
+ /**
+ * We need to have to separate insert statements as we want the table to have two different data files.
+ * This is required as one of the conditions for distcp to get invoked is to have more than 1 file.
+ */
+ @Test
+ public void testPrivilegedDistCpWithSameUserAsCurrentDoesNotTryToImpersonate() throws Throwable {
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create table t1 (id int)")
+ .run("insert into t1 values (1),(2),(3)")
+ .run("insert into t1 values (11),(12),(13)")
+ .dump(primaryDbName, null);
+
+ /*
+ We have to do a comparision on the data of table t1 in replicated database because even though the file
+ copy will fail due to impersonation failure the driver will return a success code 0. May be something to look at later
+ */
+ replica.load(replicatedDbName, tuple.dumpLocation)
+ .run("select * from " + replicatedDbName + ".t1")
+ .verifyResults(Arrays.asList("1", "2", "3", "12", "11", "13"));
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
index 1f19dfd19e..70a57f833c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -30,6 +31,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
public class TestExportImport {
@@ -48,8 +50,12 @@ public static void classLevelSetup() throws Exception {
conf.set("dfs.client.use.datanode.hostname", "true");
MiniDFSCluster miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- srcHiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, false);
- destHiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, false);
+ HashMap overridesForHiveConf = new HashMap() {{
+ put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false");
+ }};
+ srcHiveWarehouse =
+ new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
+ destHiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
}
@AfterClass
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index c084d4db34..19ad442894 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -46,10 +46,13 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
class WarehouseInstance implements Closeable {
@@ -64,7 +67,8 @@ Licensed to the Apache Software Foundation (ASF) under one
private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
- WarehouseInstance(Logger logger, MiniDFSCluster cluster, boolean hiveInTests) throws Exception {
+ WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map overridesForHiveConf)
+ throws Exception {
this.logger = logger;
this.miniDFSCluster = cluster;
assert miniDFSCluster.isClusterUp();
@@ -74,16 +78,22 @@ Licensed to the Apache Software Foundation (ASF) under one
Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier);
Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier);
this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString();
- initialize(cmRootPath.toString(), warehouseRoot.toString(), hiveInTests);
+ initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf);
}
WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception {
- this(logger, cluster, true);
+ this(logger, cluster, new HashMap() {{
+ put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
+ }});
}
- private void initialize(String cmRoot, String warehouseRoot, boolean hiveInTest)
+ private void initialize(String cmRoot, String warehouseRoot,
+ Map overridesForHiveConf)
throws Exception {
hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class);
+ for (Map.Entry entry : overridesForHiveConf.entrySet()) {
+ hiveConf.set(entry.getKey(), entry.getValue());
+ }
String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname);
String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp")
+ Path.SEPARATOR
@@ -95,7 +105,7 @@ private void initialize(String cmRoot, String warehouseRoot, boolean hiveInTest)
return;
}
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest);
+ // hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest);
// turn on db notification listener on meta store
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseRoot);
hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS);
@@ -107,7 +117,7 @@ private void initialize(String cmRoot, String warehouseRoot, boolean hiveInTest)
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
"jdbc:derby:memory:${test.tmp.dir}/APP;create=true");
hiveConf.setVar(HiveConf.ConfVars.REPLDIR,
- hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/");
+ hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/");
hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
@@ -184,39 +194,51 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Thro
return this;
}
- WarehouseInstance verifyResult (String data) throws IOException {
- verifyResults(data == null ? new String[] {} : new String[] { data });
- return this;
- }
+ WarehouseInstance verifyResult(String data) throws IOException {
+ verifyResults(data == null ? new String[] {} : new String[] { data });
+ return this;
+ }
- /**
- * All the results that are read from the hive output will not preserve
- * case sensitivity and will all be in lower case, hence we will check against
- * only lower case data values.
- * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
- * before assert.
- */
- WarehouseInstance verifyResults(String[] data) throws IOException {
- List results = getOutput();
- logger.info("Expecting {}", StringUtils.join(data, ","));
- logger.info("Got {}", results);
- assertEquals(data.length, results.size());
- for (int i = 0; i < data.length; i++) {
- assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
- }
- return this;
+ /**
+ * All the results that are read from the hive output will not preserve
+ * case sensitivity and will all be in lower case, hence we will check against
+ * only lower case data values.
+ * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
+ * before assert.
+ */
+ WarehouseInstance verifyResults(String[] data) throws IOException {
+ List results = getOutput();
+ logger.info("Expecting {}", StringUtils.join(data, ","));
+ logger.info("Got {}", results);
+ assertEquals(data.length, results.size());
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
}
+ return this;
+ }
- List getOutput() throws IOException {
- List results = new ArrayList<>();
- try {
- driver.getResults(results);
- } catch (CommandNeedRetryException e) {
- logger.warn(e.getMessage(), e);
- throw new RuntimeException(e);
- }
- return results;
+ /**
+ * verify's result without regard for ordering.
+ */
+ WarehouseInstance verifyResults(List data) throws IOException {
+ List results = getOutput();
+ logger.info("Expecting {}", StringUtils.join(data, ","));
+ logger.info("Got {}", results);
+ assertEquals(data.size(), results.size());
+ assertTrue(results.containsAll(data));
+ return this;
+ }
+
+ List getOutput() throws IOException {
+ List results = new ArrayList<>();
+ try {
+ driver.getResults(results);
+ } catch (CommandNeedRetryException e) {
+ logger.warn(e.getMessage(), e);
+ throw new RuntimeException(e);
}
+ return results;
+ }
private void printOutput() throws IOException {
for (String s : getOutput()) {
@@ -226,7 +248,6 @@ private void printOutput() throws IOException {
ReplicationV1CompatRule getReplivationV1CompatRule(List testsToSkip) {
return new ReplicationV1CompatRule(client, hiveConf, testsToSkip);
-
}
@Override
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 3053dcb50b..be52d6a371 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -8323,7 +8323,8 @@ public void run() throws MetaException {
if (currentRetries >= maxRetries) {
String message =
"Couldn't acquire the DB log notification lock because we reached the maximum"
- + " # of retries: {} retries. If this happens too often, then is recommended to "
+ + " # of retries: " + maxRetries
+ + " retries. If this happens too often, then is recommended to "
+ "increase the maximum number of retries on the"
+ " hive.notification.sequence.lock.max.retries configuration";
LOG.error(message, e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 28e7bcb8c4..a022b5d355 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -25,9 +25,12 @@
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -52,10 +55,14 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) {
this.copyAsUser = distCpDoAsUser;
}
- public void doCopy(Path destination, List srcPaths) throws IOException {
+ public void doCopy(Path destination, List srcPaths) throws IOException, LoginException {
Map> map = fsToFileMap(srcPaths);
FileSystem destinationFs = destination.getFileSystem(hiveConf);
+ UserGroupInformation ugi = Utils.getUGI();
+ String currentUser = ugi.getShortUserName();
+ boolean usePrivilegedDistCp = copyAsUser != null && !currentUser.equals(copyAsUser);
+
for (Map.Entry> entry : map.entrySet()) {
if (regularCopy(destinationFs, entry)) {
Path[] paths = entry.getValue().toArray(new Path[] {});
@@ -66,7 +73,7 @@ public void doCopy(Path destination, List srcPaths) throws IOException {
entry.getValue(), // list of source paths
destination,
false,
- copyAsUser,
+ usePrivilegedDistCp ? copyAsUser : null,
hiveConf,
ShimLoader.getHadoopShims()
);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index 3ae07f1580..2f636b626c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -30,6 +30,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.security.auth.login.LoginException;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
@@ -54,7 +55,7 @@ public FileOperations(Path dataFileListPath, Path exportRootDataDir,
exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
}
- public void export(ReplicationSpec forReplicationSpec) throws IOException, SemanticException {
+ public void export(ReplicationSpec forReplicationSpec) throws Exception {
if (forReplicationSpec.isLazy()) {
exportFilesAsList();
} else {
@@ -65,7 +66,7 @@ public void export(ReplicationSpec forReplicationSpec) throws IOException, Seman
/**
* This writes the actual data in the exportRootDataDir from the source.
*/
- private void copyFiles() throws IOException {
+ private void copyFiles() throws IOException, LoginException {
FileStatus[] fileStatuses =
LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath);
List srcPaths = new ArrayList<>();