diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0564b50276..631c83644a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -470,6 +470,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "metadata for acid tables which do not require the corresponding transaction \n" + "semantics to be applied on target. This can be removed when ACID table \n" + "replication is supported."), + //https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser + REPL_ADD_RAW_RESERVED_NAMESPACE("hive.repl.add.raw.reserved.namespace", false, + "For TDE with same encryption keys on source and target, allow Distcp super user to access \n" + + "the raw bytes from filesystem without decrypting on source and then encrypting on target."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java new file mode 100644 index 0000000000..fd05e99137 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED; + +public class TestReplicationOnHDFSEncryptedZones { + private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks"; + @Rule + public final TestName testName = new TestName(); + + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); + private static WarehouseInstance primary; + private static String primaryDbName, replicatedDbName; + private static Configuration conf; + private static MiniDFSCluster miniDFSCluster; + + @BeforeClass + public static void beforeClassSetup() throws Exception { + conf = new Configuration(); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile); + conf.setBoolean("dfs.namenode.delegation.token.always-use", true); + + conf.setLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname, 1); + conf.setLong(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname, 0); + conf.setBoolean(METASTORE_AGGREGATE_STATS_CACHE_ENABLED.varname, false); + + miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + + DFSTestUtil.createKey("test_key", miniDFSCluster, conf); + primary = new WarehouseInstance(LOG, miniDFSCluster, new HashMap() {{ + put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false"); + put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); + }}, "test_key"); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + FileUtils.deleteQuietly(new File(jksFile)); + } + + @Before + public void setup() throws Throwable { + primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + replicatedDbName = "replicated_" + primaryDbName; + primary.run("create database " + primaryDbName); + } + + @Test + public void targetAndSourceHaveDifferentEncryptionZoneKeys() throws Throwable { + DFSTestUtil.createKey("test_key123", miniDFSCluster, conf); + + WarehouseInstance replica = new WarehouseInstance(LOG, miniDFSCluster, + new HashMap() {{ + put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false"); + put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); + }}, "test_key123"); + + WarehouseInstance.Tuple tuple = + primary.run("use " + primaryDbName) + .run("create table encrypted_table (id int, value string)") + .run("insert into table encrypted_table values (1,'value1')") + .run("insert into table encrypted_table values (2,'value2')") + .dump(primaryDbName, null); + + replica + .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation + + "' with('hive.repl.add.raw.reserved.namespace'='true')") + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select value from encrypted_table") + .verifyFailure(new String[] { "value1", "value2" }); + } + + @Ignore("this is ignored as minidfs cluster as of writing this test looked like did not copy the " + + "files correctly") + @Test + public void targetAndSourceHaveSameEncryptionZoneKeys() throws Throwable { + WarehouseInstance replica = new WarehouseInstance(LOG, miniDFSCluster, + new HashMap() {{ + put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false"); + put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); + put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + }}, "test_key"); + + WarehouseInstance.Tuple tuple = + primary.run("use " + primaryDbName) + .run("create table encrypted_table (id int, value string)") + .run("insert into table encrypted_table values (1,'value1')") + .run("insert into table encrypted_table values (2,'value2')") + .dump(primaryDbName, null); + + replica + .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation + + "' with('hive.repl.add.raw.reserved.namespace'='true')") + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select value from encrypted_table") + .verifyResults(new String[] { "value1", "value2" }); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 0b46c0495b..0918d33a21 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -54,6 +54,7 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -69,8 +70,8 @@ private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); - WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map overridesForHiveConf) - throws Exception { + WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map overridesForHiveConf, + String keyNameForEncryptedZone) throws Exception { this.logger = logger; this.miniDFSCluster = cluster; assert miniDFSCluster.isClusterUp(); @@ -78,15 +79,28 @@ DistributedFileSystem fs = miniDFSCluster.getFileSystem(); Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier); + if (StringUtils.isNotEmpty(keyNameForEncryptedZone)) { + fs.createEncryptionZone(warehouseRoot, keyNameForEncryptedZone); + } Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier); this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString(); initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf); } - WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { + WarehouseInstance(Logger logger, MiniDFSCluster cluster, String keyNameForEncryptedZone) + throws Exception { this(logger, cluster, new HashMap() {{ put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); - }}); + }}, keyNameForEncryptedZone); + } + + WarehouseInstance(Logger logger, MiniDFSCluster cluster, + Map overridesForHiveConf) throws Exception { + this(logger, cluster, overridesForHiveConf, null); + } + + WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { + this(logger, cluster, (String) null); } private void initialize(String cmRoot, String warehouseRoot, @@ -236,6 +250,20 @@ WarehouseInstance verifyResults(String[] data) throws IOException { return this; } + WarehouseInstance verifyFailure(String[] data) throws IOException { + List results = getOutput(); + logger.info("Expecting {}", StringUtils.join(data, ",")); + logger.info("Got {}", results); + boolean dataMatched = (data.length == results.size()); + if (dataMatched) { + for (int i = 0; i < data.length; i++) { + dataMatched &= data[i].toLowerCase().equals(results.get(i).toLowerCase()); + } + } + assertFalse(dataMatched); + return this; + } + /** * verify's result without regard for ordering. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index f24d1b6502..4e61280c9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.repl; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -31,25 +32,28 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - import javax.security.auth.login.LoginException; import java.io.IOException; +import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class CopyUtils { private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class); + // https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser + private static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/"; + private static final int MAX_COPY_RETRY = 3; private final HiveConf hiveConf; private final long maxCopyFileSize; private final long maxNumberOfFiles; private final boolean hiveInTest; private final String copyAsUser; - private final int MAX_COPY_RETRY = 3; public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -107,8 +111,7 @@ private void doCopyRetry(FileSystem sourceFs, List f FileSystem destinationFs, Path destination, boolean useRegularCopy) throws IOException, LoginException { int repeat = 0; - List pathList = Lists.transform(fileList, - fileInfo -> { return fileInfo.getEffectivePath(); }); + List pathList = Lists.transform(fileList, ReplChangeManager.FileInfo::getEffectivePath); while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) { try { doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy); @@ -143,21 +146,62 @@ private void doCopyOnce(FileSystem sourceFs, List srcList, boolean useRegularCopy) throws IOException, LoginException { UserGroupInformation ugi = Utils.getUGI(); String currentUser = ugi.getShortUserName(); - boolean usePrivilegedDistCp = copyAsUser != null && !currentUser.equals(copyAsUser); + boolean usePrivilegedUser = copyAsUser != null && !currentUser.equals(copyAsUser); if (useRegularCopy) { - Path[] paths = srcList.toArray(new Path[] {}); - FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf); + doRegularCopyOnce(sourceFs, srcList, destinationFs, destination, usePrivilegedUser); + } else { + doDistCpCopyOnce(sourceFs, srcList, destination, usePrivilegedUser); + } + } + + private void doDistCpCopyOnce(FileSystem sourceFs, List srcList, Path destination, + boolean usePrivilegedUser) throws IOException { + if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) { + srcList = srcList.stream().map(path -> { + URI uri = path.toUri(); + return new Path(uri.getScheme(), uri.getAuthority(), + RAW_RESERVED_VIRTUAL_PATH + uri.getPath()); + }).collect(Collectors.toList()); + URI destinationUri = destination.toUri(); + destination = new Path(destinationUri.getScheme(), destinationUri.getAuthority(), + RAW_RESERVED_VIRTUAL_PATH + destinationUri.getPath()); + hiveConf.set("distcp.options.px",""); + } + + FileUtils.distCp( + sourceFs, // source file system + srcList, // list of source paths + destination, + false, + usePrivilegedUser ? copyAsUser : null, + hiveConf, + ShimLoader.getHadoopShims() + ); + } + + private void doRegularCopyOnce(FileSystem sourceFs, List srcList, FileSystem destinationFs, + Path destination, boolean usePrivilegedUser) throws IOException { + /* + even for regular copy we have to use the same user permissions that distCp will use since + hive-server user might be different that the super user required to copy relevant files. + */ + final Path[] paths = srcList.toArray(new Path[] {}); + if (usePrivilegedUser) { + final Path finalDestination = destination; + UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( + copyAsUser, UserGroupInformation.getLoginUser()); + try { + proxyUser.doAs((PrivilegedExceptionAction) () -> { + FileUtil + .copy(sourceFs, paths, destinationFs, finalDestination, false, true, hiveConf); + return true; + }); + } catch (InterruptedException e) { + throw new IOException(e); + } } else { - FileUtils.distCp( - sourceFs, // source file system - srcList, // list of source paths - destination, - false, - usePrivilegedDistCp ? copyAsUser : null, - hiveConf, - ShimLoader.getHadoopShims() - ); + FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf); } }