diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 985fd8cfa3..1df64b66fc 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -583,13 +583,23 @@ public static Path makeAbsolute(FileSystem fileSystem, Path path) throws IOExcep * Copies files between filesystems. */ public static boolean copy(FileSystem srcFS, Path src, - FileSystem dstFS, Path dst, - boolean deleteSource, - boolean overwrite, - HiveConf conf) throws IOException { + FileSystem dstFS, Path dst, + boolean deleteSource, + boolean overwrite, + HiveConf conf) throws IOException { return copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf, ShimLoader.getHadoopShims()); } + /** + * Copies files between filesystems as a fs super user using distcp, and runs + * as a privileged user. + */ + public static boolean privilegedCopy(FileSystem srcFS, Path src, Path dst, + HiveConf conf) throws IOException { + String privilegedUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + return distCp(srcFS, src, dst, false, privilegedUser, conf, ShimLoader.getHadoopShims()); + } + @VisibleForTesting static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, @@ -612,18 +622,34 @@ static boolean copy(FileSystem srcFS, Path src, HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")"); LOG.info("Launch distributed copy (distcp) job."); triedDistcp = true; - copied = shims.runDistCp(src, dst, conf); - if (copied && deleteSource) { - srcFS.delete(src, true); - } + copied = distCp(srcFS, src, dst, deleteSource, null, conf, shims); } } if (!triedDistcp) { + // Note : Currently, this implementation does not "fall back" to regular copy if distcp + // is tried and it fails. We depend upon that behaviour in cases like replication, + // wherein if distcp fails, there is good reason to not plod along with a trivial + // implementation, and fail instead. copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf); } return copied; } + static boolean distCp(FileSystem srcFS, Path src, Path dst, + boolean deleteSource, String doAsUser, + HiveConf conf, HadoopShims shims) throws IOException { + boolean copied = false; + if (doAsUser == null){ + copied = shims.runDistCp(src, dst, conf); + } else { + copied = shims.runDistCpAs(src, dst, conf, doAsUser); + } + if (copied && deleteSource) { + srcFS.delete(src,true); + } + return copied; + } + /** * Move a particular file or directory to the trash. * @param fs FileSystem to use diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1c37b6e091..8519ff9ba1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2606,6 +2606,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SERVER2_ENABLE_DOAS("hive.server2.enable.doAs", true, "Setting this property to true will have HiveServer2 execute\n" + "Hive operations as the user making the calls to it."), + HIVE_DISTCP_DOAS_USER("hive.distcp.privileged.doAs","hdfs", + "This property allows privileged distcp executions done by hive\n" + + "to run as this user. Typically, it should be the user you\n" + + "run the namenode as, such as the 'hdfs' user."), HIVE_SERVER2_TABLE_TYPE_MAPPING("hive.server2.table.type.mapping", "CLASSIC", new StringSet("CLASSIC", "HIVE"), "This setting reflects how HiveServer2 will report the table types for JDBC and other\n" + "client implementations that retrieve the available tables and supported table types\n" + @@ -3394,6 +3398,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.security.authenticator.manager,hive.security.authorization.manager," + "hive.security.metastore.authorization.manager,hive.security.metastore.authenticator.manager," + "hive.users.in.admin.role,hive.server2.xsrf.filter.enabled,hive.security.authorization.enabled," + + "hive.distcp.privileged.doAs," + "hive.server2.authentication.ldap.baseDN," + "hive.server2.authentication.ldap.url," + "hive.server2.authentication.ldap.Domain," + diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java index adc9b0c11a..d3c876103b 100644 --- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java +++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java @@ -228,4 +228,26 @@ public void testCopyWithDistcp() throws IOException { Assert.assertTrue(FileUtils.copy(mockFs, copySrc, mockFs, copyDst, false, false, conf, shims)); verify(shims).runDistCp(copySrc, copyDst, conf); } + + @Test + public void testCopyWithDistCpAs() throws IOException { + Path copySrc = new Path("copySrc"); + Path copyDst = new Path("copyDst"); + HiveConf conf = new HiveConf(TestFileUtils.class); + + FileSystem fs = copySrc.getFileSystem(conf); + + String doAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + + HadoopShims shims = mock(HadoopShims.class); + when(shims.runDistCpAs(copySrc, copyDst, conf, doAsUser)).thenReturn(true); + when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(false); + + // doAs when asked + Assert.assertTrue(FileUtils.distCp(fs, copySrc, copyDst, true, doAsUser, conf, shims)); + verify(shims).runDistCpAs(copySrc, copyDst, conf, doAsUser); + // don't doAs when not asked + Assert.assertFalse(FileUtils.distCp(fs, copySrc, copyDst, true, null, conf, shims)); + verify(shims).runDistCp(copySrc, copyDst, conf); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index d2f9e79cd8..f277284ec6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -139,10 +139,7 @@ protected int execute(DriverContext driverContext) { if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){ LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath); - if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath, - false, // delete source - true, // overwrite destination - conf)) { + if (!doCopy(toPath, dstFs, oneSrc.getPath(), actualSrcFs)) { console.printError("Failed to copy: '" + oneSrc.getPath().toString() + "to: '" + toPath.toString() + "'"); return 1; @@ -169,6 +166,16 @@ protected int execute(DriverContext driverContext) { } } + private boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs) throws IOException { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)){ + // regular copy in test env. + return FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf); + } else { + // distcp in actual deployment with privilege escalation + return FileUtils.privilegedCopy(srcFs, src, dst, conf); + } + } + private List filesInFileListing(FileSystem fs, Path path) throws IOException { diff --git a/shims/0.23/pom.xml b/shims/0.23/pom.xml index 7c586fab98..3ff1d38776 100644 --- a/shims/0.23/pom.xml +++ b/shims/0.23/pom.xml @@ -205,6 +205,14 @@ ${hadoop.version} provided + + junit + junit + test + - + + ${basedir}/src/main/java + ${basedir}/src/main/test + diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 0483e91c4b..4319bedfd4 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.NoSuchAlgorithmException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -37,6 +38,8 @@ import java.util.Set; import java.util.TreeMap; import javax.security.auth.Subject; +import javax.security.auth.login.LoginException; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; @@ -1081,16 +1084,53 @@ public void setStoragePolicy(Path path, StoragePolicyValue policy) } } + private static final String DISTCP_OPTIONS_PREFIX = "distcp.options."; + + List constructDistCpParams(Path src, Path dst, Configuration conf) { + List params = new ArrayList(); + for (Map.Entry entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){ + String distCpOption = entry.getKey(); + String distCpVal = entry.getValue(); + params.add("-" + distCpOption); + if ((distCpVal != null) && (!distCpVal.isEmpty())){ + params.add(distCpVal); + } + } + if (params.size() == 0){ + // if no entries were added via conf, we initiate our defaults + params.add("-update"); + params.add("-skipcrccheck"); + } + params.add(src.toString()); + params.add(dst.toString()); + return params; + } + @Override - public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException { + public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws IOException { + UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( + doAsUser, UserGroupInformation.getLoginUser()); + try { + return proxyUser.doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return runDistCp(src, dst, conf); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + @Override + public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException { DistCpOptions options = new DistCpOptions(Collections.singletonList(src), dst); options.setSyncFolder(true); options.setSkipCRC(true); options.preserve(FileAttribute.BLOCKSIZE); // Creates the command-line parameters for distcp - String[] params = {"-update", "-skipcrccheck", src.toString(), dst.toString()}; + List params = constructDistCpParams(src, dst, conf); try { conf.setBoolean("mapred.mapper.new-api", true); @@ -1098,7 +1138,7 @@ public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOExcept // HIVE-13704 states that we should use run() instead of execute() due to a hadoop known issue // added by HADOOP-10459 - if (distcp.run(params) == 0) { + if (distcp.run(params.toArray(new String[0])) == 0) { return true; } else { return false; diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java new file mode 100644 index 0000000000..e672233609 --- /dev/null +++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.shims; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.tools.DistCpOptions; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestHadoop23Shims { + + @Test + public void testConstructDistCpParams() { + Path copySrc = new Path("copySrc"); + Path copyDst = new Path("copyDst"); + Configuration conf = new Configuration(); + + Hadoop23Shims shims = new Hadoop23Shims(); + List paramsDefault = shims.constructDistCpParams(copySrc, copyDst, conf); + + assertEquals(4, paramsDefault.size()); + assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); + assertTrue("Distcp -skipcrccheck set by default", paramsDefault.contains("-skipcrccheck")); + assertEquals(copySrc.toString(), paramsDefault.get(2)); + assertEquals(copyDst.toString(), paramsDefault.get(3)); + + conf.set("distcp.options.foo","bar"); // should set "-foo bar" + conf.set("distcp.options.blah",""); // should set "-blah" + conf.set("dummy","option"); // should be ignored. + List paramsWithCustomParamInjection = + shims.constructDistCpParams(copySrc, copyDst, conf); + + assertEquals(5, paramsWithCustomParamInjection.size()); + + // check that the defaults did not remain. + assertTrue("Distcp -update not set if not requested", + !paramsWithCustomParamInjection.contains("-update")); + assertTrue("Distcp -skipcrccheck not set if not requested", + !paramsWithCustomParamInjection.contains("-skipcrccheck")); + + // the "-foo bar" and "-blah" params order is not guaranteed + String firstParam = paramsWithCustomParamInjection.get(0); + if (firstParam.equals("-foo")){ + // "-foo bar -blah" form + assertEquals("bar", paramsWithCustomParamInjection.get(1)); + assertEquals("-blah", paramsWithCustomParamInjection.get(2)); + } else { + // "-blah -foo bar" form + assertEquals("-blah", paramsWithCustomParamInjection.get(0)); + assertEquals("-foo", paramsWithCustomParamInjection.get(1)); + assertEquals("bar", paramsWithCustomParamInjection.get(2)); + } + + // the dummy option should not have made it either - only options + // beginning with distcp.options. should be honoured + assertTrue(!paramsWithCustomParamInjection.contains("dummy")); + assertTrue(!paramsWithCustomParamInjection.contains("-dummy")); + assertTrue(!paramsWithCustomParamInjection.contains("option")); + assertTrue(!paramsWithCustomParamInjection.contains("-option")); + + assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(3)); + assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(4)); + + } + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 9c6901d17c..aba4a5c8e2 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -480,6 +480,20 @@ public void checkFileAccess(FileSystem fs, FileStatus status, FsAction action) /** * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes. * This distributed process is meant to copy huge files that could take some time if a single + * copy is done. This is a variation which allows proxying as a different user to perform + * the distcp, and requires that the caller have requisite proxy user privileges. + * + * @param src Path to the source file or directory to copy + * @param dst Path to the destination file or directory + * @param conf The hadoop configuration object + * @param doAsUser The user to perform the distcp as + * @return True if it is successfull; False otherwise. + */ + public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws IOException; + + /** + * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes. + * This distributed process is meant to copy huge files that could take some time if a single * copy is done. * * @param src Path to the source file or directory to copy