diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 985fd8cfa3..4316350a52 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -619,6 +619,10 @@ static boolean copy(FileSystem srcFS, Path src,
}
}
if (!triedDistcp) {
+ // Note : Currently, this implementation does not "fall back" to regular copy if distcp
+ // is tried and it fails. We depend upon that behaviour in cases like replication,
+ // wherein if distcp fails, there is good reason to not plod along with a trivial
+ // implementation, and fail instead.
copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf);
}
return copied;
diff --git a/shims/0.23/pom.xml b/shims/0.23/pom.xml
index 7c586fab98..3ff1d38776 100644
--- a/shims/0.23/pom.xml
+++ b/shims/0.23/pom.xml
@@ -205,6 +205,14 @@
${hadoop.version}
provided
+
+ junit
+ junit
+ test
+
-
+
+ ${basedir}/src/main/java
+ ${basedir}/src/main/test
+
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 0483e91c4b..b7dc9e1334 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -37,6 +38,8 @@
import java.util.Set;
import java.util.TreeMap;
import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
@@ -1081,6 +1084,32 @@ public void setStoragePolicy(Path path, StoragePolicyValue policy)
}
}
+ private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
+ private static final String PRIVILEGED_USER = "privilegedUser";
+
+ List constructDistCpParams(Path src, Path dst, Configuration conf) {
+ List params = new ArrayList();
+ for (Map.Entry entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
+ String distCpOption = entry.getKey();
+ String distCpVal = entry.getValue();
+ if (! distCpOption.equals(PRIVILEGED_USER)){
+ // privileged user setting is not passed on to the cmdline args
+ params.add("-" + distCpOption);
+ if ((distCpVal != null) && (!distCpVal.isEmpty())){
+ params.add(distCpVal);
+ }
+ }
+ }
+ if (params.size() == 0){
+ // if no entries were added via conf, we initiate our defaults
+ params.add("-update");
+ params.add("-skipcrccheck");
+ }
+ params.add(src.toString());
+ params.add(dst.toString());
+ return params;
+ }
+
@Override
public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException {
@@ -1090,15 +1119,59 @@ public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOExcept
options.preserve(FileAttribute.BLOCKSIZE);
// Creates the command-line parameters for distcp
- String[] params = {"-update", "-skipcrccheck", src.toString(), dst.toString()};
+ List params = constructDistCpParams(src, dst, conf);
+
+ // determine if impersonation is required - default is false.
+ // if we determine that impersonation is required, current use
+ // must be configured to be capable of doing user impersonation.
+ // we first check if distcp.options.privilegedUser is set. If not
+ // set or if the user is the same as the current user, we do not
+ // attempt to impersonate.
+ String privilegedUser = conf.get(DISTCP_OPTIONS_PREFIX + PRIVILEGED_USER);
+ try {
+ if (privilegedUser != null){
+ UserGroupInformation ugi = Utils.getUGI();
+ String currentUser = ugi.getShortUserName();
+ if (!currentUser.equals(privilegedUser)){
+ return runDistCpWithImpersonation(src, dst, conf, options, params, privilegedUser);
+ }
+ }
+ } catch (LoginException le) {
+ throw new IOException(le);
+ }
+
+ // if we're here, then we did not impersonate. run as-is.
+ return runDistCp(src, dst, conf, options, params);
+
+ }
+
+ boolean runDistCpWithImpersonation(
+ final Path src, final Path dst, final Configuration conf,
+ final DistCpOptions options, final List params,
+ String impersonatedUser) throws IOException {
+ UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+ impersonatedUser, UserGroupInformation.getLoginUser());
+ try {
+ return proxyUser.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Boolean run() throws Exception {
+ return runDistCp(src, dst, conf, options, params);
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ boolean runDistCp(Path src, Path dst, Configuration conf,
+ DistCpOptions options, List params) throws IOException {
try {
conf.setBoolean("mapred.mapper.new-api", true);
DistCp distcp = new DistCp(conf, options);
// HIVE-13704 states that we should use run() instead of execute() due to a hadoop known issue
// added by HADOOP-10459
- if (distcp.run(params) == 0) {
+ if (distcp.run(params.toArray(new String[0])) == 0) {
return true;
} else {
return false;
diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
new file mode 100644
index 0000000000..c8acb864a8
--- /dev/null
+++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.shims;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestHadoop23Shims {
+
+ @Test
+ public void testConstructDistCpParams() {
+ Path copySrc = new Path("copySrc");
+ Path copyDst = new Path("copyDst");
+ Configuration conf = new Configuration();
+
+ Hadoop23Shims shims = new Hadoop23Shims();
+ List paramsDefault = shims.constructDistCpParams(copySrc, copyDst, conf);
+
+ assertEquals(4, paramsDefault.size());
+ assertTrue("Distcp -update set by default", paramsDefault.contains("-update"));
+ assertTrue("Distcp -skipcrccheck set by default", paramsDefault.contains("-skipcrccheck"));
+ assertEquals(copySrc.toString(), paramsDefault.get(2));
+ assertEquals(copyDst.toString(), paramsDefault.get(3));
+
+ conf.set("distcp.options.foo","bar"); // should set "-foo bar"
+ conf.set("distcp.options.blah",""); // should set "-blah"
+ conf.set("dummy","option"); // should be ignored.
+ List paramsWithCustomParamInjection =
+ shims.constructDistCpParams(copySrc, copyDst, conf);
+
+ assertEquals(5, paramsWithCustomParamInjection.size());
+
+ // check that the defaults did not remain.
+ assertTrue("Distcp -update not set if not requested",
+ !paramsWithCustomParamInjection.contains("-update"));
+ assertTrue("Distcp -skipcrccheck not set if not requested",
+ !paramsWithCustomParamInjection.contains("-skipcrccheck"));
+
+ // the "-foo bar" and "-blah" params order is not guaranteed
+ String firstParam = paramsWithCustomParamInjection.get(0);
+ if (firstParam.equals("-foo")){
+ // "-foo bar -blah" form
+ assertEquals("bar", paramsWithCustomParamInjection.get(1));
+ assertEquals("-blah", paramsWithCustomParamInjection.get(2));
+ } else {
+ // "-blah -foo bar" form
+ assertEquals("-blah", paramsWithCustomParamInjection.get(0));
+ assertEquals("-foo", paramsWithCustomParamInjection.get(1));
+ assertEquals("bar", paramsWithCustomParamInjection.get(2));
+ }
+
+ // the dummy option should not have made it either - only options
+ // beginning with distcp.options. should be honoured
+ assertTrue(!paramsWithCustomParamInjection.contains("dummy"));
+ assertTrue(!paramsWithCustomParamInjection.contains("-dummy"));
+ assertTrue(!paramsWithCustomParamInjection.contains("option"));
+ assertTrue(!paramsWithCustomParamInjection.contains("-option"));
+
+ assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(3));
+ assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(4));
+
+ }
+
+ @Test
+ public void testRunDistCpWithImpersonation() {
+
+ Hadoop23Shims shims = new Hadoop23Shims();
+ Hadoop23Shims mock23Shims = spy(shims);
+
+ String impersonatedUser = "impersonatedUser";
+
+ try {
+
+ doReturn(true).when(mock23Shims).
+ runDistCpWithImpersonation(
+ any(Path.class),
+ any(Path.class),
+ any(Configuration.class),
+ any(DistCpOptions.class),
+ any(List.class),
+ any(String.class));
+ doReturn(false).when(mock23Shims).
+ runDistCp(
+ any(Path.class),
+ any(Path.class),
+ any(Configuration.class),
+ any(DistCpOptions.class),
+ any(List.class));
+
+ } catch (IOException e) {
+ assertNull("Unexpected exception", e);
+ }
+
+ Path copySrc = new Path("copySrc");
+ Path copyDst = new Path("copyDst");
+ Configuration conf = new Configuration();
+
+ try {
+ assertFalse("Impersonation should not happen when not requested",mock23Shims.runDistCp(copySrc,copyDst,conf));
+ String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+ conf.set("distcp.options.privilegedUser", currentUser);
+ assertFalse("Impersonation should not happen when current user itself is used",mock23Shims.runDistCp(copySrc,copyDst,conf));
+ conf.set("distcp.options.privilegedUser", impersonatedUser);
+ assertTrue("Impersonation should happen when requested", mock23Shims.runDistCp(copySrc, copyDst, conf));
+
+ } catch (IOException e) {
+ assertNull("Unexpected exception",e);
+ }
+ }
+
+}