diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index be83489cb3..4a83531559 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -462,6 +462,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMPDIR_TTL("hive.repl.dumpdir.ttl", "7d", new TimeValidator(TimeUnit.DAYS), "TTL of dump dirs before cleanup."), + //https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser + REPL_ADD_RAW_RESERVED_NAMESPACE("hive.repl.add.raw.reserved.namespace", false, + "For TDE with same encryption keys on source and target, allow Distcp super user to access \n" + + "the raw bytes from filesystem without decrypting on source and then encrypting on target."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationForEncryptedHDFSInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationForEncryptedHDFSInstances.java new file mode 100644 index 0000000000..f6f68c5e50 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationForEncryptedHDFSInstances.java @@ -0,0 +1,113 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at +

+ http://www.apache.org/licenses/LICENSE-2.0 +

+ Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; + +public class TestReplicationForEncryptedHDFSInstances { + private static String KEY_NAME = "test_key"; + private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks"; + @Rule + public final TestName testName = new TestName(); + + @Rule + public TestRule replV1BackwardCompatibility; + + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); + + private static WarehouseInstance primary, replica; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Configuration conf = new Configuration(); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile); + conf.setLong("hive.exec.copyfile.maxsize", 1); + conf.setBoolean("dfs.namenode.delegation.token.always-use", true); + // conf.setInt("dfs.namenode.list.encryption.zones.num.responses", 2); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + DFSTestUtil.createKey(KEY_NAME, miniDFSCluster, conf); + DFSTestUtil.createKey(KEY_NAME + "123", miniDFSCluster, conf); + + primary = new WarehouseInstance(LOG, miniDFSCluster, new HashMap() {{ + put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false"); + }}, KEY_NAME); + replica = new WarehouseInstance(LOG, miniDFSCluster, new HashMap() {{ + put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "false"); + put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + }}, KEY_NAME + "123"); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + replica.close(); + FileUtils.deleteQuietly(new File(jksFile)); + } + + private String primaryDbName, replicatedDbName; + + @Before + public void setup() throws Throwable { + replV1BackwardCompatibility = primary.getReplivationV1CompatRule(new ArrayList<>()); + primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + replicatedDbName = "replicated_" + primaryDbName; + primary.run("create database " + primaryDbName); + } + + @Test + public void replicationWhereTargetRunAndSourceOnSameTDE() throws Throwable { + WarehouseInstance.Tuple tuple = + primary.run("use " + primaryDbName) + .run("create table encrypted_table (id int, value string)") + .run("insert into table encrypted_table values (1,'some value')") + .run("insert into table encrypted_table values (2,'some other value')") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select value from encrypted_table") + .verifyResults(new String[] { "some value", "some other value" }); + + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 061817fc7f..8cfe9c7690 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -68,8 +68,8 @@ Licensed to the Apache Software Foundation (ASF) under one private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); - WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map overridesForHiveConf) - throws Exception { + WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map overridesForHiveConf, + String keyNameForEncryptedZone) throws Exception { this.logger = logger; this.miniDFSCluster = cluster; assert miniDFSCluster.isClusterUp(); @@ -77,15 +77,28 @@ Licensed to the Apache Software Foundation (ASF) under one DistributedFileSystem fs = miniDFSCluster.getFileSystem(); Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier); + if (StringUtils.isNotEmpty(keyNameForEncryptedZone)) { + fs.createEncryptionZone(warehouseRoot, keyNameForEncryptedZone); + } Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier); this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString(); initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf); } - WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { + WarehouseInstance(Logger logger, MiniDFSCluster cluster, String keyNameForEncryptedZone) + throws Exception { this(logger, cluster, new HashMap() {{ put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); - }}); + }}, keyNameForEncryptedZone); + } + + WarehouseInstance(Logger logger, MiniDFSCluster cluster, + Map overridesForHiveConf) throws Exception { + this(logger, cluster, overridesForHiveConf, null); + } + + WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { + this(logger, cluster, (String) null); } private void initialize(String cmRoot, String warehouseRoot, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index f24d1b6502..4e7826f542 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.repl; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -31,18 +32,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - import javax.security.auth.login.LoginException; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class CopyUtils { private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class); + // https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser + private static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/"; private final HiveConf hiveConf; private final long maxCopyFileSize; @@ -149,6 +152,18 @@ private void doCopyOnce(FileSystem sourceFs, List srcList, Path[] paths = srcList.toArray(new Path[] {}); FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf); } else { + if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) { + srcList = srcList.stream().map(path -> { + URI uri = path.toUri(); + return new Path(uri.getScheme(), uri.getAuthority(), + RAW_RESERVED_VIRTUAL_PATH + uri.getPath()); + }).collect(Collectors.toList()); + URI destinationUri = destination.toUri(); + destination = new Path(destinationUri.getScheme(), destinationUri.getAuthority(), + RAW_RESERVED_VIRTUAL_PATH + destinationUri.getPath()); + } + + FileUtils.distCp( sourceFs, // source file system srcList, // list of source paths