diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index bf600c2729..eeb6e58b54 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -135,7 +135,13 @@ ${project.version} test - + + org.apache.hadoop + hadoop-distcp + ${hadoop.version} + test + + org.apache.hive hive-cli ${project.version} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java new file mode 100644 index 0000000000..3989d37a45 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java @@ -0,0 +1,119 @@ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim; + +public class TestCopyUtils { + @Rule + public final TestName testName = new TestName(); + + @Rule + public TestRule replV1BackwardCompat; + + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); + + static class WarehouseInstanceWithMR extends WarehouseInstance { + + MiniMrShim mrCluster; + + WarehouseInstanceWithMR(Logger logger, MiniDFSCluster cluster, + Map overridesForHiveConf) throws Exception { + super(logger, cluster, overridesForHiveConf); + HadoopShims shims = ShimLoader.getHadoopShims(); + mrCluster = shims.getLocalMiniTezCluster(hiveConf, false); + // mrCluster = shims.getMiniMrCluster(hiveConf, 2, + // miniDFSCluster.getFileSystem().getUri().toString(), 1); + + mrCluster.setupConfiguration(hiveConf); + } + + @Override + public void close() throws IOException { + mrCluster.shutdown(); + super.close(); + } + } + + private static WarehouseInstanceWithMR primary, replica; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Configuration conf = new Configuration(); + conf.set("dfs.client.use.datanode.hostname", "true"); + + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + + UserGroupInformation ugi = Utils.getUGI(); + String currentUser = ugi.getShortUserName(); + + HashMap overridesForHiveConf = new HashMap() {{ + put(ConfVars.HIVE_IN_TEST.varname, "false"); + put(ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, "1"); + put(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); + put(ConfVars.HIVE_DISTCP_DOAS_USER.varname, currentUser); + }}; + primary = new WarehouseInstanceWithMR(LOG, miniDFSCluster, overridesForHiveConf); + replica = new WarehouseInstanceWithMR(LOG, miniDFSCluster, overridesForHiveConf); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + replica.close(); + } + + private String primaryDbName, replicatedDbName; + + @Before + public void setup() throws Throwable { + replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); + primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + replicatedDbName = "replicated_" + primaryDbName; + primary.run("create database " + primaryDbName); + } + + /** + * We need to have to separate insert statements as we want the table to have two different data files. + * This is required as one of the conditions for distcp to get invoked is to have more than 1 file. + */ + @Test + public void testPrivilegedDistCpWithSameUserAsCurrentDoesNotTryToImpersonate() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into t1 values (1),(2),(3)") + .run("insert into t1 values (11),(12),(13)") + .dump(primaryDbName, null); + + /* + We have to do a comparision on the data of table t1 in replicated database because even though the file + copy will fail due to impersonation failure the driver will return a success code 0. May be something to look at later + */ + replica.load(replicatedDbName, tuple.dumpLocation) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(Arrays.asList("1", "2", "3", "12", "11", "13")); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java index 1f19dfd19e..70a57f833c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; public class TestExportImport { @@ -48,8 +50,12 @@ public static void classLevelSetup() throws Exception { conf.set("dfs.client.use.datanode.hostname", "true"); MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - srcHiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, false); - destHiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, false); + HashMap overridesForHiveConf = new HashMap() {{ + put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false"); + }}; + srcHiveWarehouse = + new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + destHiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); } @AfterClass diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index c084d4db34..19ad442894 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -46,10 +46,13 @@ Licensed to the Apache Software Foundation (ASF) under one import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; class WarehouseInstance implements Closeable { @@ -64,7 +67,8 @@ Licensed to the Apache Software Foundation (ASF) under one private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); - WarehouseInstance(Logger logger, MiniDFSCluster cluster, boolean hiveInTests) throws Exception { + WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map overridesForHiveConf) + throws Exception { this.logger = logger; this.miniDFSCluster = cluster; assert miniDFSCluster.isClusterUp(); @@ -74,16 +78,22 @@ Licensed to the Apache Software Foundation (ASF) under one Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier); Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier); this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString(); - initialize(cmRootPath.toString(), warehouseRoot.toString(), hiveInTests); + initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf); } WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { - this(logger, cluster, true); + this(logger, cluster, new HashMap() {{ + put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); + }}); } - private void initialize(String cmRoot, String warehouseRoot, boolean hiveInTest) + private void initialize(String cmRoot, String warehouseRoot, + Map overridesForHiveConf) throws Exception { hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); + for (Map.Entry entry : overridesForHiveConf.entrySet()) { + hiveConf.set(entry.getKey(), entry.getValue()); + } String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname); String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR @@ -95,7 +105,7 @@ private void initialize(String cmRoot, String warehouseRoot, boolean hiveInTest) return; } - hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest); + // hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest); // turn on db notification listener on meta store hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseRoot); hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS); @@ -107,7 +117,7 @@ private void initialize(String cmRoot, String warehouseRoot, boolean hiveInTest) hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:memory:${test.tmp.dir}/APP;create=true"); hiveConf.setVar(HiveConf.ConfVars.REPLDIR, - hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/"); + hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/"); hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); @@ -184,39 +194,51 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Thro return this; } - WarehouseInstance verifyResult (String data) throws IOException { - verifyResults(data == null ? new String[] {} : new String[] { data }); - return this; - } + WarehouseInstance verifyResult(String data) throws IOException { + verifyResults(data == null ? new String[] {} : new String[] { data }); + return this; + } - /** - * All the results that are read from the hive output will not preserve - * case sensitivity and will all be in lower case, hence we will check against - * only lower case data values. - * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case - * before assert. - */ - WarehouseInstance verifyResults(String[] data) throws IOException { - List results = getOutput(); - logger.info("Expecting {}", StringUtils.join(data, ",")); - logger.info("Got {}", results); - assertEquals(data.length, results.size()); - for (int i = 0; i < data.length; i++) { - assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); - } - return this; + /** + * All the results that are read from the hive output will not preserve + * case sensitivity and will all be in lower case, hence we will check against + * only lower case data values. + * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case + * before assert. + */ + WarehouseInstance verifyResults(String[] data) throws IOException { + List results = getOutput(); + logger.info("Expecting {}", StringUtils.join(data, ",")); + logger.info("Got {}", results); + assertEquals(data.length, results.size()); + for (int i = 0; i < data.length; i++) { + assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); } + return this; + } - List getOutput() throws IOException { - List results = new ArrayList<>(); - try { - driver.getResults(results); - } catch (CommandNeedRetryException e) { - logger.warn(e.getMessage(), e); - throw new RuntimeException(e); - } - return results; + /** + * verify's result without regard for ordering. + */ + WarehouseInstance verifyResults(List data) throws IOException { + List results = getOutput(); + logger.info("Expecting {}", StringUtils.join(data, ",")); + logger.info("Got {}", results); + assertEquals(data.size(), results.size()); + assertTrue(results.containsAll(data)); + return this; + } + + List getOutput() throws IOException { + List results = new ArrayList<>(); + try { + driver.getResults(results); + } catch (CommandNeedRetryException e) { + logger.warn(e.getMessage(), e); + throw new RuntimeException(e); } + return results; + } private void printOutput() throws IOException { for (String s : getOutput()) { @@ -226,7 +248,6 @@ private void printOutput() throws IOException { ReplicationV1CompatRule getReplivationV1CompatRule(List testsToSkip) { return new ReplicationV1CompatRule(client, hiveConf, testsToSkip); - } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 28e7bcb8c4..a022b5d355 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -25,9 +25,12 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.login.LoginException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -52,10 +55,14 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) { this.copyAsUser = distCpDoAsUser; } - public void doCopy(Path destination, List srcPaths) throws IOException { + public void doCopy(Path destination, List srcPaths) throws IOException, LoginException { Map> map = fsToFileMap(srcPaths); FileSystem destinationFs = destination.getFileSystem(hiveConf); + UserGroupInformation ugi = Utils.getUGI(); + String currentUser = ugi.getShortUserName(); + boolean usePrivilegedDistCp = copyAsUser != null && !currentUser.equals(copyAsUser); + for (Map.Entry> entry : map.entrySet()) { if (regularCopy(destinationFs, entry)) { Path[] paths = entry.getValue().toArray(new Path[] {}); @@ -66,7 +73,7 @@ public void doCopy(Path destination, List srcPaths) throws IOException { entry.getValue(), // list of source paths destination, false, - copyAsUser, + usePrivilegedDistCp ? copyAsUser : null, hiveConf, ShimLoader.getHadoopShims() ); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 3ae07f1580..2f636b626c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -30,6 +30,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.login.LoginException; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; @@ -54,7 +55,7 @@ public FileOperations(Path dataFileListPath, Path exportRootDataDir, exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); } - public void export(ReplicationSpec forReplicationSpec) throws IOException, SemanticException { + public void export(ReplicationSpec forReplicationSpec) throws Exception { if (forReplicationSpec.isLazy()) { exportFilesAsList(); } else { @@ -65,7 +66,7 @@ public void export(ReplicationSpec forReplicationSpec) throws IOException, Seman /** * This writes the actual data in the exportRootDataDir from the source. */ - private void copyFiles() throws IOException { + private void copyFiles() throws IOException, LoginException { FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath); List srcPaths = new ArrayList<>();