diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e7724f9084..aafc88e44f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -452,6 +452,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPLCMRETIAN("hive.repl.cm.retain","24h", new TimeValidator(TimeUnit.HOURS), "Time to retain removed files in cmrootdir."), + REPLCMENCRYPTEDDIR("hive.encrypted.repl.cmrootdir", "/cmroot/", + "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."), REPLCMINTERVAL("hive.repl.cm.interval","3600s", new TimeValidator(TimeUnit.SECONDS), "Inteval for cmroot cleanup thread."), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java new file mode 100644 index 0000000000..230f6d8ada --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java @@ -0,0 +1,1162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.junit.Assert.*; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.thrift.TException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + + +/** + * TestMetaStoreAuthorization. + */ +public class TestMetaStoreMultipleEncryptionZones { + private static HiveMetaStoreClient client; + private static HiveConf hiveConf; + private static Configuration conf; + private static Warehouse warehouse; + private static FileSystem warehouseFs; + private static MiniDFSCluster m_dfs; + private static String cmroot; + private static FileSystem fs; + private static HadoopShims.HdfsEncryptionShim shimCm; + private static String cmrootEncrypted; + private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks"; + + @BeforeClass + public static void setUp() throws Exception { + //Create secure cluster + conf = new Configuration(); + conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile); + m_dfs = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + DFSTestUtil.createKey("test_key_cm", m_dfs, conf); + DFSTestUtil.createKey("test_key_db", m_dfs, conf); + hiveConf = new HiveConf(TestReplChangeManager.class); + hiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + m_dfs.getNameNode().getHostAndPort() + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + + cmroot = "hdfs://" + m_dfs.getNameNode().getHostAndPort() + "/cmroot"; + cmrootEncrypted = "/cmrootEncrypted/"; + hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot); + hiveConf.set(HiveConf.ConfVars.REPLCMENCRYPTEDDIR.varname, cmrootEncrypted); + warehouse = new Warehouse(hiveConf); + warehouseFs = warehouse.getWhRoot().getFileSystem(hiveConf); + fs = new Path(cmroot).getFileSystem(hiveConf); + fs.mkdirs(warehouse.getWhRoot()); + + //Create cm in encrypted zone + shimCm = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf); + + try { + client = new HiveMetaStoreClient(hiveConf); + } catch (Throwable e) { + System.err.println("Unable to open the metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw e; + } + } + + @AfterClass + public static void tearDown() throws Exception { + try { + m_dfs.shutdown(); + client.close(); + } catch (Throwable e) { + System.err.println("Unable to close metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw e; + } + } + + @Test + public void dropTableWithDifferentEncryptionZones() throws Throwable { + String dbName = "simpdb"; + String tblName1 = "simptbl1"; + String tblName2 = "simptbl2"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_db"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropTableWithDifferentEncryptionZonesDifferentKey() throws Throwable { + String dbName = "simpdb"; + String tblName1 = "simptbl1"; + String tblName2 = "simptbl2"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_cm"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropTableWithSameEncryptionZones() throws Throwable { + String dbName = "simpdb"; + String tblName1 = "simptbl1"; + String tblName2 = "simptbl2"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + shimCm.createEncryptionZone(dirDb, "test_key_db"); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropTableWithoutEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb"; + String tblName = "simptbl"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName); + Assert.assertNotNull(tbl); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName); + } catch (Exception e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithSameEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb"; + String tblName1 = "simptbl1"; + String tblName2 = "simptbl2"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + shimCm.createEncryptionZone(dirDb, "test_key_db"); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithDifferentEncryptionZones() throws Throwable { + String dbName = "simpdb"; + String tblName1 = "simptbl1"; + String tblName2 = "simptbl2"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_db"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithDifferentEncryptionZonesDifferentKey() throws Throwable { + String dbName = "simpdb"; + String tblName1 = "simptbl1"; + String tblName2 = "simptbl2"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_cm"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithoutEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb"; + String tblName = "simptbl"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName); + Assert.assertNotNull(tbl); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName); + } catch (Exception e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void truncateTableWithDifferentEncryptionZones() throws Throwable { + String dbName = "simpdb"; + String tblName1 = "simptbl1"; + String tblName2 = "simptbl2"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_db"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName1, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + + try { + client.truncateTable(dbName, tblName2, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void truncateTableWithDifferentEncryptionZonesDifferentKey() throws Throwable { + String dbName = "simpdb"; + String tblName1 = "simptbl1"; + String tblName2 = "simptbl2"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_cm"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName1, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + + try { + client.truncateTable(dbName, tblName2, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void truncateTableWithSameEncryptionZones() throws Throwable { + String dbName = "simpdb"; + String tblName1 = "simptbl1"; + String tblName2 = "simptbl2"; + String typeName = "Person"; + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + shimCm.createEncryptionZone(dirDb, "test_key_db"); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName1, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + + try { + client.truncateTable(dbName, tblName2, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void truncateTableWithoutEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb"; + String tblName = "simptbl"; + String typeName = "Person"; + client.dropTable(dbName, tblName); + silentDropDatabase(dbName, client); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl2 = client.getTable(dbName, tblName); + Assert.assertNotNull(tbl2); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName, null); + } catch (Exception e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void recycleFailureWithDifferentEncryptionZonesForCm() throws Throwable { + + Path dirDb = new Path(warehouse.getWhRoot(), "db3"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, "tbl1"); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, RecycleType.MOVE, false); + } catch (RemoteException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + } + + + + + private static void silentDropDatabase(String dbName, HiveMetaStoreClient client) throws MetaException, TException{ + try { + for (String tableName : client.getTables(dbName, "*")) { + client.dropTable(dbName, tableName); + } + client.dropDatabase(dbName); + } catch (NoSuchObjectException|InvalidOperationException e) { + // NOP + } + } + + private void createFile(Path path, String content) throws IOException { + FSDataOutputStream output = path.getFileSystem(hiveConf).create(path); + output.writeChars(content); + output.close(); + } + + private void delete(Path path) throws IOException { + path.getFileSystem(hiveConf).delete(path, true); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java index 5c5dec4865..248fe0f5a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -308,7 +309,8 @@ private void setArchived(Partition p, Path harPath, int level) { private void deleteIntermediateOriginalDir(Table table, Path intermediateOriginalDir) throws HiveException { if (HdfsUtils.pathExists(intermediateOriginalDir, context.getConf())) { - AlterTableArchiveUtils.deleteDir(intermediateOriginalDir, context.getDb().getDatabase(table.getDbName()), + AlterTableArchiveUtils.deleteDir(intermediateOriginalDir, + ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()), context.getConf()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java index ebac6f68d2..a95d39baa6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java @@ -95,10 +95,10 @@ static Path getInterMediateDir(Path dir, Configuration conf, ConfVars suffixConf return new Path(dir.getParent(), dir.getName() + intermediateDirSuffix); } - static void deleteDir(Path dir, Database db, Configuration conf) throws HiveException { + static void deleteDir(Path dir, boolean shouldEnableCm, Configuration conf) throws HiveException { try { Warehouse wh = new Warehouse(conf); - wh.deleteDir(dir, true, db); + wh.deleteDir(dir, true, false, shouldEnableCm); } catch (MetaException e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java index 4f791a3f4d..39416ede9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo; @@ -282,8 +283,9 @@ private void setUnArchived(Partition partition) { private void deleteIntermediateArchivedDir(Table table, Path intermediateArchivedDir) throws HiveException { if (HdfsUtils.pathExists(intermediateArchivedDir, context.getConf())) { - AlterTableArchiveUtils.deleteDir(intermediateArchivedDir, context.getDb().getDatabase(table.getDbName()), - context.getConf()); + AlterTableArchiveUtils.deleteDir(intermediateArchivedDir, + ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()), + context.getConf()); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 75a0ea5d19..9c507a11b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2369,7 +2369,7 @@ private Partition loadPartitionInternal(Path loadPath, Table tbl, Map srcPaths, Path dst, Configuration conf, UserGroup @VisibleForTesting public void createEncryptionZone(Path path, String keyName) throws IOException; + /** + * get encryption zone by path + * @param path HDFS path to create encryption zone + * @throws IOException + */ + public EncryptionZone getEncryptionZoneForPath(Path path) throws IOException; + /** * Creates an encryption key. * @@ -624,6 +632,11 @@ public void createEncryptionZone(Path path, String keyName) { /* not supported */ } + @Override + public EncryptionZone getEncryptionZoneForPath(Path path) throws IOException { + return null; + } + @Override public void createKey(String keyName, int bitLength) { /* not supported */ diff --git a/standalone-metastore/metastore-common/pom.xml b/standalone-metastore/metastore-common/pom.xml index e252f12b14..a969b704e0 100644 --- a/standalone-metastore/metastore-common/pom.xml +++ b/standalone-metastore/metastore-common/pom.xml @@ -256,6 +256,12 @@ mockito-core test + + org.apache.hive.shims + hive-shims-common + 4.0.0-SNAPSHOT + compile + diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index c6acc57a97..88beec9bfa 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -34,13 +35,21 @@ import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsEncryptionShim; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +59,8 @@ private static boolean inited = false; private static boolean enabled = false; - private static Path cmroot; + private static Map cmRootMapping = new HashMap<>(); + private static HadoopShims hadoopShims = ShimLoader.getHadoopShims(); private static Configuration conf; private String msUser; private String msGroup; @@ -61,6 +71,7 @@ public static final String SOURCE_OF_REPLICATION = "repl.source.for"; private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]"; static final String CM_THREAD_NAME_PREFIX = "cmclearer-"; + private static final String NO_ENCRYPTION = "noEncryption"; public enum RecycleType { MOVE, @@ -138,15 +149,18 @@ private ReplChangeManager(Configuration conf) throws MetaException { if (!inited) { if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) { ReplChangeManager.enabled = true; - ReplChangeManager.cmroot = new Path(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR)); ReplChangeManager.conf = conf; + //Create default cm root + Path cmroot = new Path(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR)); FileSystem cmFs = cmroot.getFileSystem(conf); // Create cmroot with permission 700 if not exist if (!cmFs.exists(cmroot)) { cmFs.mkdirs(cmroot); cmFs.setPermission(cmroot, new FsPermission("700")); } + cmRootMapping.put(NO_ENCRYPTION, cmroot); + UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); msUser = usergroupInfo.getShortUserName(); msGroup = usergroupInfo.getPrimaryGroupName(); @@ -180,7 +194,7 @@ public boolean accept(Path p){ * @return int * @throws IOException */ - public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOException { + public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOException, MetaException { if (!enabled) { return 0; } @@ -194,7 +208,7 @@ public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOExcept } } else { String fileCheckSum = checksumFor(path, fs); - Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, cmroot.toString()); + Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, getCmRoot(path).toString()); // set timestamp before moving to cmroot, so we can // avoid race condition CM remove the file before setting @@ -213,9 +227,18 @@ public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOExcept switch (type) { case MOVE: { LOG.info("Moving {} to {}", path.toString(), cmPath.toString()); - // Rename fails if the file with same name already exist. - success = fs.rename(path, cmPath); + Retry retriable = new Retry(IOException.class) { + @Override + public Boolean execute() throws IOException { + return fs.rename(path, cmPath); + } + }; + try { + success = retriable.run(); + } catch (Exception e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } break; } case COPY: { @@ -361,9 +384,10 @@ public static String encodeFileUri(String fileUriStr, String fileChecksum, Strin throw new IllegalStateException("Uninitialized ReplChangeManager instance."); } String encodedUri = fileUriStr; - if ((fileChecksum != null) && (cmroot != null)) { + Path cmRoot = getCmRoot(new Path(fileUriStr)); + if ((fileChecksum != null) && (cmRoot != null)) { encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum - + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmroot, conf); + + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmRoot, conf); } else { encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR; } @@ -461,12 +485,19 @@ static void scheduleCMClearer(Configuration conf) { .namingPattern(CM_THREAD_NAME_PREFIX + "%d") .daemon(true) .build()); - executor.scheduleAtFixedRate(new CMClearer(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR), - MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), - 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); + for (String cmRoot : cmRootMapping.keySet()) { + executor.scheduleAtFixedRate(new CMClearer(cmRoot, + MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), + 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); + } } } + public static boolean shouldEnableCm(Database db, Table table) { + assert (table != null); + return isSourceOfReplication(db) && !MetaStoreUtils.isExternalTable(table); + } + public static boolean isSourceOfReplication(Database db) { assert (db != null); String replPolicyIds = getReplPolicyIdString(db); @@ -493,4 +524,29 @@ public static String joinWithSeparator(Iterable strings) { public static String[] getListFromSeparatedString(String commaSeparatedString) { return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*"); } + + private static Path getCmRoot(Path path) throws IOException { + Path cmroot = null; + HdfsEncryptionShim pathEncryptionShim = hadoopShims.createHdfsEncryptionShim(path.getFileSystem(conf), conf); + if (!pathEncryptionShim.isPathEncrypted(path)) { + cmroot = cmRootMapping.get(NO_ENCRYPTION); + } else { + EncryptionZone encryptionZone = pathEncryptionShim.getEncryptionZoneForPath(path); + cmroot = cmRootMapping.get(encryptionZone.getPath()); + if (cmroot == null) { + synchronized (instance) { + cmroot = new Path(path.getFileSystem(conf).getUri() + encryptionZone.getPath() + + MetastoreConf.getVar(conf, ConfVars.REPLCMENCRYPTEDDIR)); + FileSystem cmFs = cmroot.getFileSystem(conf); + // Create cmroot with permission 700 if not exist + if (!cmFs.exists(cmroot)) { + cmFs.mkdirs(cmroot); + cmFs.setPermission(cmroot, new FsPermission("700")); + } + cmRootMapping.put(encryptionZone.getPath(), cmroot); + } + } + } + return cmroot; + } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 38417be5d7..d4e9addfa9 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -915,6 +915,8 @@ public static ConfVars getMetaConf(String name) { "This class is used to store and retrieval of raw metadata objects such as table, database"), REPLCMDIR("metastore.repl.cmrootdir", "hive.repl.cmrootdir", "/user/${system:user.name}/cmroot/", "Root dir for ChangeManager, used for deleted files."), + REPLCMENCRYPTEDDIR("metastore.encrypted.repl.cmrootdir", "hive.encrypted.repl.cmrootdir", "/cmroot/", + "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."), REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain", 24, TimeUnit.HOURS, "Time to retain removed files in cmrootdir."), REPLCMINTERVAL("metastore.repl.cm.interval", "hive.repl.cm.interval", 3600, TimeUnit.SECONDS, diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java new file mode 100644 index 0000000000..499d68bdd9 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import org.apache.hadoop.hive.metastore.api.MetaException; + +public abstract class Retry { + + public static final int MAX_RETRIES = 3; + private int tries = 0; + private Class retryExceptionType; + + public Retry(Class exceptionClassType) { + this.retryExceptionType = exceptionClassType; + } + + public abstract T execute() throws Exception; + + public T run() throws Exception { + try { + return execute(); + } catch(Exception e) { + if (e.getClass().equals(retryExceptionType)){ + tries++; + if (MAX_RETRIES == tries) { + throw e; + } else { + return run(); + } + } else { + throw e; + } + } + } +} \ No newline at end of file diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java new file mode 100644 index 0000000000..b42042304a --- /dev/null +++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for retriable interface + */ +public class RetryTest { + @Test + public void testRetrySuccess() { + Retry retriable = new Retry(NullPointerException.class) { + @Override + public Void execute() { + throw new NullPointerException(); + } + }; + try { + retriable.run(); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + } + } + + @Test + public void testRetryFailure() { + Retry retriable = new Retry(NullPointerException.class) { + @Override + public Void execute() { + throw new RuntimeException(); + } + }; + try { + retriable.run(); + } catch (Exception e) { + Assert.assertEquals(RuntimeException.class, e.getClass()); + } + + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 5eaf53f79b..8d925aa8a1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -272,7 +272,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam } // check that src exists and also checks permissions necessary, rename src to dest if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, - ReplChangeManager.isSourceOfReplication(olddb))) { + ReplChangeManager.shouldEnableCm(olddb, oldt))) { dataWasMoved = true; } } catch (IOException | MetaException e) { @@ -428,7 +428,8 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam Path deleteOldDataLoc = new Path(oldt.getSd().getLocation()); boolean isAutoPurge = "true".equalsIgnoreCase(oldt.getParameters().get("auto.purge")); try { - wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, olddb); + wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, + ReplChangeManager.shouldEnableCm(olddb, oldt)); LOG.info("Deleted the old data location: {} for the table: {}", deleteOldDataLoc, dbname + "." + name); } catch (MetaException ex) { @@ -651,7 +652,7 @@ public Partition alterPartition(RawStore msdb, Warehouse wh, String catName, Str } //rename the data directory - wh.renameDir(srcPath, destPath, ReplChangeManager.isSourceOfReplication(db)); + wh.renameDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl)); LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done."); dataWasMoved = true; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 94698e6771..f68d741f35 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -2226,7 +2226,7 @@ private void create_table_core(final RawStore ms, final CreateTableRequest req) if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(tblPath, true, db); + wh.deleteDir(tblPath, true, false, ReplChangeManager.shouldEnableCm(db, tbl)); } } @@ -2804,10 +2804,9 @@ private boolean drop_table_core(final RawStore ms, final String catName, final S } else if (tableDataShouldBeDeleted) { // Data needs deletion. Check if trash may be skipped. // Delete the data in the partitions which have other locations - deletePartitionData(partPaths, ifPurge, db); + deletePartitionData(partPaths, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl)); // Delete the data in the table - deleteTableData(tblPath, ifPurge, db); - // ok even if the data is not deleted + deleteTableData(tblPath, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl)); } if (!listeners.isEmpty()) { @@ -2835,17 +2834,25 @@ private boolean checkTableDataShouldBeDeleted(Table tbl, boolean deleteData) { * @param tablePath * @param ifPurge completely purge the table (skipping trash) while removing * data from warehouse - * @param db database the table belongs to + * @param shouldEnableCm If cm should be enabled */ - private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { + private void deleteTableData(Path tablePath, boolean ifPurge, boolean shouldEnableCm) throws MetaException { + if (tablePath != null) { + wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm); + } + } + /** + * Deletes the data in a table's location, if it fails logs an error + * + * @param tablePath + * @param ifPurge completely purge the table (skipping trash) while removing + * data from warehouse + * @param db Database + */ + private void deleteTableData(Path tablePath, boolean ifPurge, Database db) throws MetaException { if (tablePath != null) { - try { - wh.deleteDir(tablePath, true, ifPurge, db); - } catch (Exception e) { - LOG.error("Failed to delete table directory: " + tablePath + - " " + e.getMessage()); - } + wh.deleteDir(tablePath, true, ifPurge, db); } } @@ -2856,8 +2863,30 @@ private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { * @param partPaths * @param ifPurge completely purge the partition (skipping trash) while * removing data from warehouse - * @param db database the partition belongs to + * @param shouldEnableCm If cm should be enabled */ + private void deletePartitionData(List partPaths, boolean ifPurge, boolean shouldEnableCm) { + if (partPaths != null && !partPaths.isEmpty()) { + for (Path partPath : partPaths) { + try { + wh.deleteDir(partPath, true, ifPurge, shouldEnableCm); + } catch (Exception e) { + LOG.error("Failed to delete partition directory: " + partPath + + " " + e.getMessage()); + } + } + } + } + + /** + * Give a list of partitions' locations, tries to delete each one + * and for each that fails logs an error. + * + * @param partPaths + * @param ifPurge completely purge the partition (skipping trash) while + * removing data from warehouse + * @param db Database + */ private void deletePartitionData(List partPaths, boolean ifPurge, Database db) { if (partPaths != null && !partPaths.isEmpty()) { for (Path partPath : partPaths) { @@ -2865,7 +2894,7 @@ private void deletePartitionData(List partPaths, boolean ifPurge, Database wh.deleteDir(partPath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete partition directory: " + partPath + - " " + e.getMessage()); + " " + e.getMessage()); } } } @@ -3123,7 +3152,7 @@ private void truncateTableInternal(String dbName, String tableName, List HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getConf(), fs, location); FileStatus targetStatus = fs.getFileStatus(location); String targetGroup = targetStatus == null ? null : targetStatus.getGroup(); - wh.deleteDir(location, true, isAutopurge, db); + wh.deleteDir(location, true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl)); fs.mkdirs(location); HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, location, false); } else { @@ -3132,7 +3161,7 @@ private void truncateTableInternal(String dbName, String tableName, List continue; } for (final FileStatus status : statuses) { - wh.deleteDir(status.getPath(), true, isAutopurge, db); + wh.deleteDir(status.getPath(), true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl)); } } } @@ -3617,7 +3646,7 @@ private Partition append_partition_common(RawStore ms, String catName, String db if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(partLocation, true, db); + wh.deleteDir(partLocation, true, false, ReplChangeManager.shouldEnableCm(db, tbl)); } } @@ -4350,8 +4379,8 @@ private Partition add_partition_core(final RawStore ms, success = ms.addPartition(part); } finally { if (!success && madeDir) { - wh.deleteDir(new Path(part.getSd().getLocation()), true, - ms.getDatabase(tbl.getCatName(), tbl.getDbName())); + wh.deleteDir(new Path(part.getSd().getLocation()), true, false, + ReplChangeManager.shouldEnableCm(ms.getDatabase(part.getCatName(), part.getDbName()), tbl)); } } @@ -4621,7 +4650,7 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam Path archiveParentDir = null; boolean mustPurge = false; boolean tableDataShouldBeDeleted = false; - boolean isSourceOfReplication = false; + boolean needsCm = false; Map transactionalListenerResponses = Collections.emptyMap(); if (db_name == null) { @@ -4669,7 +4698,7 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam new DropPartitionEvent(tbl, part, true, deleteData, this), envContext); } - isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, db_name)); + needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, db_name), tbl); success = ms.commitTransaction(); } } finally { @@ -4688,11 +4717,11 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam if (isArchived) { assert (archiveParentDir != null); - wh.deleteDir(archiveParentDir, true, mustPurge, isSourceOfReplication); + wh.deleteDir(archiveParentDir, true, mustPurge, needsCm); } else { assert (partPath != null); - wh.deleteDir(partPath, true, mustPurge, isSourceOfReplication); - deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, isSourceOfReplication); + wh.deleteDir(partPath, true, mustPurge, needsCm); + deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, needsCm); } // ok even if the data is not deleted } @@ -4768,7 +4797,8 @@ public DropPartitionsResult drop_partitions_req( List parts = null; boolean mustPurge = false; List> transactionalListenerResponses = Lists.newArrayList(); - boolean isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, dbName)); + boolean needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, dbName), + ms.getTable(catName, dbName, tblName)); try { // We need Partition-s for firing events and for result; DN needs MPartition-s to drop. @@ -4876,12 +4906,12 @@ public DropPartitionsResult drop_partitions_req( // Archived partitions have har:/to_har_file as their location. // The original directory was saved in params for (Path path : archToDelete) { - wh.deleteDir(path, true, mustPurge, isSourceOfReplication); + wh.deleteDir(path, true, mustPurge, needsCm); } for (PathAndPartValSize p : dirsToDelete) { - wh.deleteDir(p.path, true, mustPurge, isSourceOfReplication); + wh.deleteDir(p.path, true, mustPurge, needsCm); try { - deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, isSourceOfReplication); + deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, needsCm); } catch (IOException ex) { LOG.warn("Error from deleteParentRecursive", ex); throw new MetaException("Failed to delete parent: " + ex.getMessage()); @@ -7776,7 +7806,7 @@ public void drop_function(String dbName, String funcName) if (func == null) { throw new NoSuchObjectException("Function " + funcName + " does not exist"); } - Boolean isSourceOfReplication = + Boolean needsCm = ReplChangeManager.isSourceOfReplication(get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME])); // if copy of jar to change management fails we fail the metastore transaction, since the @@ -7784,7 +7814,7 @@ public void drop_function(String dbName, String funcName) // a copy is required to allow incremental replication to work correctly. if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) { for (ResourceUri uri : func.getResourceUris()) { - if (uri.getUri().toLowerCase().startsWith("hdfs:") && isSourceOfReplication) { + if (uri.getUri().toLowerCase().startsWith("hdfs:") && needsCm) { wh.addToChangeManagement(new Path(uri.getUri())); } }