diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e7724f9084..35d1eed3dd 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -452,6 +452,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPLCMRETIAN("hive.repl.cm.retain","24h", new TimeValidator(TimeUnit.HOURS), "Time to retain removed files in cmrootdir."), + REPLCMENCRYPTEDDIR("hive.repl.cm.encryptionzone.rootdir", ".cmroot", + "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."), REPLCMINTERVAL("hive.repl.cm.interval","3600s", new TimeValidator(TimeUnit.SECONDS), "Inteval for cmroot cleanup thread."), diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 66bdee1f48..08878189ef 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -274,6 +274,8 @@ public static void connectToMetastore() throws Exception { conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, DummyRawStoreFailEvent.class.getName()); + conf.set(HiveConf.ConfVars.REPLCMDIR, "cmroot"); + conf.set(ConfVars.REPLCMENABLED, "true"); MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, CLEANUP_SLEEP_TIME, TimeUnit.SECONDS); MetastoreConf.setVar(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY, JSONMessageEncoder.class.getName()); conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java new file mode 100644 index 0000000000..ad62bf6b6d --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java @@ -0,0 +1,1148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.thrift.TException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * TestMetaStoreAuthorization. + */ +public class TestMetaStoreMultipleEncryptionZones { + private static HiveMetaStoreClient client; + private static HiveConf hiveConf; + private static Configuration conf; + private static Warehouse warehouse; + private static FileSystem warehouseFs; + private static MiniDFSCluster miniDFSCluster; + private static String cmroot; + private static FileSystem fs; + private static HadoopShims.HdfsEncryptionShim shimCm; + private static String cmrootEncrypted; + private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks"; + + @BeforeClass + public static void setUp() throws Exception { + //Create secure cluster + conf = new Configuration(); + conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile); + miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + DFSTestUtil.createKey("test_key_cm", miniDFSCluster, conf); + DFSTestUtil.createKey("test_key_db", miniDFSCluster, conf); + hiveConf = new HiveConf(TestReplChangeManager.class); + hiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + + cmroot = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmroot"; + cmrootEncrypted = "/cmrootEncrypted/"; + hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot); + hiveConf.set(HiveConf.ConfVars.REPLCMENCRYPTEDDIR.varname, cmrootEncrypted); + warehouse = new Warehouse(hiveConf); + warehouseFs = warehouse.getWhRoot().getFileSystem(hiveConf); + fs = new Path(cmroot).getFileSystem(hiveConf); + fs.mkdirs(warehouse.getWhRoot()); + + //Create cm in encrypted zone + shimCm = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf); + + try { + client = new HiveMetaStoreClient(hiveConf); + } catch (Throwable e) { + System.err.println("Unable to open the metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw e; + } + } + + @AfterClass + public static void tearDown() throws Exception { + try { + miniDFSCluster.shutdown(); + client.close(); + } catch (Throwable e) { + System.err.println("Unable to close metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw e; + } + } + + @Test + public void dropTableWithDifferentEncryptionZonesDifferentKey() throws Throwable { + String dbName = "encrdb1"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_cm"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropTableWithDifferentEncryptionZones() throws Throwable { + String dbName = "encrdb2"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_db"); + Path part12 = new Path(dirTbl2, "part2"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropTableWithSameEncryptionZones() throws Throwable { + String dbName = "encrdb3"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + silentDropDatabase(dbName); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + shimCm.createEncryptionZone(dirDb, "test_key_db"); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropTableWithoutEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb1"; + String tblName = "simptbl"; + String typeName = "Person"; + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName); + Assert.assertNotNull(tbl); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName); + } catch (Exception e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithSameEncryptionZonesForCm() throws Throwable { + String dbName = "encrdb4"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + shimCm.createEncryptionZone(dirDb, "test_key_db"); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithDifferentEncryptionZones() throws Throwable { + String dbName = "encrdb5"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_db"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithDifferentEncryptionZonesDifferentKey() throws Throwable { + String dbName = "encrdb6"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_cm"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithoutEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb2"; + String tblName = "simptbl"; + String typeName = "Person"; + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName); + Assert.assertNotNull(tbl); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName); + } catch (Exception e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void truncateTableWithDifferentEncryptionZones() throws Throwable { + String dbName = "encrdb7"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + client.dropTable(dbName, tblName1); + client.dropTable(dbName, tblName2); + silentDropDatabase(dbName); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_db"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName1, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + + try { + client.truncateTable(dbName, tblName2, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void truncateTableWithDifferentEncryptionZonesDifferentKey() throws Throwable { + String dbName = "encrdb8"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + client.dropTable(dbName, tblName1); + client.dropTable(dbName, tblName2); + silentDropDatabase(dbName); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_cm"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName1, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + + try { + client.truncateTable(dbName, tblName2, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void truncateTableWithSameEncryptionZones() throws Throwable { + String dbName = "encrdb9"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + client.dropTable(dbName, tblName1); + client.dropTable(dbName, tblName2); + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + shimCm.createEncryptionZone(dirDb, "test_key_db"); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName1, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + + try { + client.truncateTable(dbName, tblName2, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void truncateTableWithoutEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb3"; + String tblName = "simptbl"; + String typeName = "Person"; + client.dropTable(dbName, tblName); + silentDropDatabase(dbName); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl2 = client.getTable(dbName, tblName); + Assert.assertNotNull(tbl2); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName, null); + } catch (Exception e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void recycleFailureWithDifferentEncryptionZonesForCm() throws Throwable { + + Path dirDb = new Path(warehouse.getWhRoot(), "db3"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, "tbl1"); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, RecycleType.MOVE, false); + } catch (RemoteException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + } + + + private void createFile(Path path, String content) throws IOException { + FSDataOutputStream output = path.getFileSystem(hiveConf).create(path); + output.writeChars(content); + output.close(); + } + + private void silentDropDatabase(String dbName) throws TException { + try { + for (String tableName : client.getTables(dbName, "*")) { + client.dropTable(dbName, tableName); + } + client.dropDatabase(dbName); + } catch (NoSuchObjectException|InvalidOperationException e) { + // NOP + } + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java index 562b2c9763..19d38d2f04 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java @@ -48,6 +48,8 @@ public void setUp() throws Exception { MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true); MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true); MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, false); + MetastoreConf.setBoolVar(conf, ConfVars.REPLCMENABLED, true); + MetastoreConf.setVar(conf, ConfVars.REPLCMDIR, "cmroot"); MetaStoreTestUtils.setConfForStandloneMode(conf); hmsHandler = new HiveMetaStore.HMSHandler("testCachedStore", conf, true); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 92a0bbe806..a5005dc613 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.processors.DfsProcessor; @@ -202,6 +203,8 @@ public static void setUpBeforeClass() throws SQLException, ClassNotFoundExceptio System.setProperty(ConfVars.HIVE_AUTHORIZATION_MANAGER.varname, "org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider"); System.setProperty(ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION.varname, "false"); + System.setProperty(ConfVars.REPLCMENABLED.varname, "true"); + System.setProperty(ConfVars.REPLCMDIR.varname, "cmroot"); con = getConnection(defaultDbName + ";create=true"); Statement stmt = con.createStatement(); assertNotNull("Statement is null", stmt); @@ -2828,6 +2831,8 @@ public void testGetQueryLogForReplCommands() throws Exception { stmt.execute("set hive.metastore.transactional.event.listeners =" + " org.apache.hive.hcatalog.listener.DbNotificationListener"); stmt.execute("set hive.metastore.dml.events = true"); + stmt.execute("set hive.repl.cm.enabled = true"); + stmt.execute("set hive.repl.cmrootdir = cmroot"); stmt.execute("create database " + primaryDb + " with dbproperties('repl.source.for'='1,2,3')"); stmt.execute("create table " + primaryTblName + " (id int)"); stmt.execute("insert into " + primaryTblName + " values (1), (2)"); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 03a1926440..66c425870a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -104,8 +104,11 @@ public static void setupBeforeClass() throws Exception { MiniHS2.cleanupLocalDir(); HiveConf conf = new HiveConf(); + conf.set(HiveConf.ConfVars.REPLCMDIR.varname, "cmroot"); + conf.set(ConfVars.REPLCMENABLED.varname, "true"); dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + try { startMiniHS2(conf); } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java index 5c5dec4865..248fe0f5a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -308,7 +309,8 @@ private void setArchived(Partition p, Path harPath, int level) { private void deleteIntermediateOriginalDir(Table table, Path intermediateOriginalDir) throws HiveException { if (HdfsUtils.pathExists(intermediateOriginalDir, context.getConf())) { - AlterTableArchiveUtils.deleteDir(intermediateOriginalDir, context.getDb().getDatabase(table.getDbName()), + AlterTableArchiveUtils.deleteDir(intermediateOriginalDir, + ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()), context.getConf()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java index ebac6f68d2..c285405522 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; @@ -95,10 +94,10 @@ static Path getInterMediateDir(Path dir, Configuration conf, ConfVars suffixConf return new Path(dir.getParent(), dir.getName() + intermediateDirSuffix); } - static void deleteDir(Path dir, Database db, Configuration conf) throws HiveException { + static void deleteDir(Path dir, boolean shouldEnableCm, Configuration conf) throws HiveException { try { Warehouse wh = new Warehouse(conf); - wh.deleteDir(dir, true, db); + wh.deleteDir(dir, true, false, shouldEnableCm); } catch (MetaException e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java index 4f791a3f4d..39416ede9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo; @@ -282,8 +283,9 @@ private void setUnArchived(Partition partition) { private void deleteIntermediateArchivedDir(Table table, Path intermediateArchivedDir) throws HiveException { if (HdfsUtils.pathExists(intermediateArchivedDir, context.getConf())) { - AlterTableArchiveUtils.deleteDir(intermediateArchivedDir, context.getDb().getDatabase(table.getDbName()), - context.getConf()); + AlterTableArchiveUtils.deleteDir(intermediateArchivedDir, + ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()), + context.getConf()); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 75a0ea5d19..9c507a11b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2369,7 +2369,7 @@ private Partition loadPartitionInternal(Path loadPath, Table tbl, Map srcPaths, Path dst, Configuration conf, UserGroup public int comparePathKeyStrength(Path path1, Path path2) throws IOException; /** - * create encryption zone by path and keyname + * Create encryption zone by path and keyname. * @param path HDFS path to create encryption zone * @param keyName keyname * @throws IOException @@ -572,6 +573,13 @@ boolean runDistCpAs(List srcPaths, Path dst, Configuration conf, UserGroup @VisibleForTesting public void createEncryptionZone(Path path, String keyName) throws IOException; + /** + * Get encryption zone by path. + * @param path HDFS path to create encryption zone. + * @throws IOException + */ + EncryptionZone getEncryptionZoneForPath(Path path) throws IOException; + /** * Creates an encryption key. * @@ -624,6 +632,11 @@ public void createEncryptionZone(Path path, String keyName) { /* not supported */ } + @Override + public EncryptionZone getEncryptionZoneForPath(Path path) throws IOException { + return null; + } + @Override public void createKey(String keyName, int bitLength) { /* not supported */ diff --git a/standalone-metastore/metastore-common/pom.xml b/standalone-metastore/metastore-common/pom.xml index e252f12b14..81dc6b6451 100644 --- a/standalone-metastore/metastore-common/pom.xml +++ b/standalone-metastore/metastore-common/pom.xml @@ -256,6 +256,11 @@ mockito-core test + + org.apache.hive + hive-shims + ${project.version} + diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index c6acc57a97..76719247e2 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -34,13 +36,21 @@ import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsEncryptionShim; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +60,8 @@ private static boolean inited = false; private static boolean enabled = false; - private static Path cmroot; + private static Set encryptionZones = new HashSet<>(); + private static HadoopShims hadoopShims = ShimLoader.getHadoopShims(); private static Configuration conf; private String msUser; private String msGroup; @@ -61,6 +72,9 @@ public static final String SOURCE_OF_REPLICATION = "repl.source.for"; private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]"; static final String CM_THREAD_NAME_PREFIX = "cmclearer-"; + private static final String NO_ENCRYPTION = "noEncryption"; + private static String cmRootDir; + private static String encryptedCmRootDir; public enum RecycleType { MOVE, @@ -138,15 +152,25 @@ private ReplChangeManager(Configuration conf) throws MetaException { if (!inited) { if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) { ReplChangeManager.enabled = true; - ReplChangeManager.cmroot = new Path(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR)); ReplChangeManager.conf = conf; - + cmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMDIR); + encryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMENCRYPTEDDIR); + //Create default cm root + Path cmroot = new Path(cmRootDir); + HdfsEncryptionShim pathEncryptionShim = hadoopShims + .createHdfsEncryptionShim(cmroot.getFileSystem(conf), conf); + if (pathEncryptionShim.isPathEncrypted(cmroot)) { + LOG.warn(ConfVars.REPLCMDIR + " should not be encrypted. To pass cm dir for encrypted path use " + + ConfVars.REPLCMENCRYPTEDDIR); + } FileSystem cmFs = cmroot.getFileSystem(conf); // Create cmroot with permission 700 if not exist if (!cmFs.exists(cmroot)) { cmFs.mkdirs(cmroot); cmFs.setPermission(cmroot, new FsPermission("700")); } + encryptionZones.add(NO_ENCRYPTION); + UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); msUser = usergroupInfo.getShortUserName(); msGroup = usergroupInfo.getPrimaryGroupName(); @@ -194,7 +218,7 @@ public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOExcept } } else { String fileCheckSum = checksumFor(path, fs); - Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, cmroot.toString()); + Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, getCmRoot(path).toString()); // set timestamp before moving to cmroot, so we can // avoid race condition CM remove the file before setting @@ -213,9 +237,18 @@ public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOExcept switch (type) { case MOVE: { LOG.info("Moving {} to {}", path.toString(), cmPath.toString()); - // Rename fails if the file with same name already exist. - success = fs.rename(path, cmPath); + Retry retriable = new Retry(IOException.class) { + @Override + public Boolean execute() throws IOException { + return fs.rename(path, cmPath); + } + }; + try { + success = retriable.run(); + } catch (Exception e) { + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } break; } case COPY: { @@ -361,9 +394,10 @@ public static String encodeFileUri(String fileUriStr, String fileChecksum, Strin throw new IllegalStateException("Uninitialized ReplChangeManager instance."); } String encodedUri = fileUriStr; - if ((fileChecksum != null) && (cmroot != null)) { + Path cmRoot = getCmRoot(new Path(fileUriStr)); + if ((fileChecksum != null) && (cmRoot != null)) { encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum - + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmroot, conf); + + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmRoot, conf); } else { encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR; } @@ -404,12 +438,12 @@ public static boolean isCMFileUri(Path fromPath) { * Thread to clear old files of cmroot recursively */ static class CMClearer implements Runnable { - private Path cmroot; + private Set encryptionZones; private long secRetain; private Configuration conf; - CMClearer(String cmrootString, long secRetain, Configuration conf) { - this.cmroot = new Path(cmrootString); + CMClearer(Set encryptionZones, long secRetain, Configuration conf) { + this.encryptionZones = encryptionZones; this.secRetain = secRetain; this.conf = conf; } @@ -418,32 +452,39 @@ public static boolean isCMFileUri(Path fromPath) { public void run() { try { LOG.info("CMClearer started"); - - long now = System.currentTimeMillis(); - FileSystem fs = cmroot.getFileSystem(conf); - FileStatus[] files = fs.listStatus(cmroot); - - for (FileStatus file : files) { - long modifiedTime = file.getModificationTime(); - if (now - modifiedTime > secRetain*1000) { - try { - if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { - boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf); - if (succ) { - LOG.debug("Move " + file.toString() + " to trash"); - } else { - LOG.warn("Fail to move " + file.toString() + " to trash"); - } - } else { - boolean succ = fs.delete(file.getPath(), false); - if (succ) { - LOG.debug("Remove " + file.toString()); + for (String encryptionZone : encryptionZones) { + Path cmroot; + if (encryptionZone.equals(NO_ENCRYPTION)) { + cmroot = new Path(cmRootDir); + } else { + cmroot = new Path(encryptionZone + encryptedCmRootDir); + } + long now = System.currentTimeMillis(); + FileSystem fs = cmroot.getFileSystem(conf); + FileStatus[] files = fs.listStatus(cmroot); + + for (FileStatus file : files) { + long modifiedTime = file.getModificationTime(); + if (now - modifiedTime > secRetain * 1000) { + try { + if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { + boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf); + if (succ) { + LOG.debug("Move " + file.toString() + " to trash"); + } else { + LOG.warn("Fail to move " + file.toString() + " to trash"); + } } else { - LOG.warn("Fail to remove " + file.toString()); + boolean succ = fs.delete(file.getPath(), false); + if (succ) { + LOG.debug("Remove " + file.toString()); + } else { + LOG.warn("Fail to remove " + file.toString()); + } } + } catch (UnsupportedOperationException e) { + LOG.warn("Error getting xattr for " + file.getPath().toString()); } - } catch (UnsupportedOperationException e) { - LOG.warn("Error getting xattr for " + file.getPath().toString()); } } } @@ -461,12 +502,17 @@ static void scheduleCMClearer(Configuration conf) { .namingPattern(CM_THREAD_NAME_PREFIX + "%d") .daemon(true) .build()); - executor.scheduleAtFixedRate(new CMClearer(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR), - MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), - 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); + executor.scheduleAtFixedRate(new CMClearer(encryptionZones, + MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), + 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); } } + public static boolean shouldEnableCm(Database db, Table table) { + assert (table != null); + return isSourceOfReplication(db) && !MetaStoreUtils.isExternalTable(table); + } + public static boolean isSourceOfReplication(Database db) { assert (db != null); String replPolicyIds = getReplPolicyIdString(db); @@ -493,4 +539,30 @@ public static String joinWithSeparator(Iterable strings) { public static String[] getListFromSeparatedString(String commaSeparatedString) { return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*"); } + + private static Path getCmRoot(Path path) throws IOException { + Path cmroot; + HdfsEncryptionShim pathEncryptionShim = hadoopShims.createHdfsEncryptionShim(path.getFileSystem(conf), conf); + if (!pathEncryptionShim.isPathEncrypted(path)) { + cmroot = new Path(cmRootDir); + } else { + String encryptionZonePath = path.getFileSystem(conf).getUri() + + pathEncryptionShim.getEncryptionZoneForPath(path).getPath(); + cmroot = new Path(encryptionZonePath + encryptedCmRootDir); + if (!encryptionZones.contains(encryptionZonePath)){ + synchronized (instance) { + if (!encryptionZones.contains(encryptionZonePath)) { + FileSystem cmFs = cmroot.getFileSystem(conf); + // Create cmroot with permission 700 if not exist + if (!cmFs.exists(cmroot)) { + cmFs.mkdirs(cmroot); + cmFs.setPermission(cmroot, new FsPermission("700")); + } + encryptionZones.add(encryptionZonePath); + } + } + } + } + return cmroot; + } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 38417be5d7..8a63bf2106 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -915,6 +915,8 @@ public static ConfVars getMetaConf(String name) { "This class is used to store and retrieval of raw metadata objects such as table, database"), REPLCMDIR("metastore.repl.cmrootdir", "hive.repl.cmrootdir", "/user/${system:user.name}/cmroot/", "Root dir for ChangeManager, used for deleted files."), + REPLCMENCRYPTEDDIR("metastore.repl.cm.encryptionzone.rootdir", "hive.repl.cm.encryptionzone.rootdir", ".cmroot", + "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."), REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain", 24, TimeUnit.HOURS, "Time to retain removed files in cmrootdir."), REPLCMINTERVAL("metastore.repl.cm.interval", "hive.repl.cm.interval", 3600, TimeUnit.SECONDS, diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java new file mode 100644 index 0000000000..bdb269a34d --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.utils; + +/** + * Class to implement any retry logic in case of exceptions. + */ +public abstract class Retry { + + public static final int MAX_RETRIES = 3; + private int tries = 0; + private Class retryExceptionType; + + public Retry(Class exceptionClassType) { + this.retryExceptionType = exceptionClassType; + } + + public abstract T execute() throws Exception; + + public T run() throws Exception { + try { + return execute(); + } catch(Exception e) { + if (e.getClass().equals(retryExceptionType)){ + tries++; + if (MAX_RETRIES == tries) { + throw e; + } else { + return run(); + } + } else { + throw e; + } + } + } +} diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java new file mode 100644 index 0000000000..15039468cd --- /dev/null +++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.utils; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for retriable interface + */ +public class RetryTest { + @Test + public void testRetrySuccess() { + Retry retriable = new Retry(NullPointerException.class) { + @Override + public Void execute() { + throw new NullPointerException(); + } + }; + try { + retriable.run(); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + } + } + + @Test + public void testRetryFailure() { + Retry retriable = new Retry(NullPointerException.class) { + @Override + public Void execute() { + throw new RuntimeException(); + } + }; + try { + retriable.run(); + } catch (Exception e) { + Assert.assertEquals(RuntimeException.class, e.getClass()); + } + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 5eaf53f79b..8d925aa8a1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -272,7 +272,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam } // check that src exists and also checks permissions necessary, rename src to dest if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, - ReplChangeManager.isSourceOfReplication(olddb))) { + ReplChangeManager.shouldEnableCm(olddb, oldt))) { dataWasMoved = true; } } catch (IOException | MetaException e) { @@ -428,7 +428,8 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam Path deleteOldDataLoc = new Path(oldt.getSd().getLocation()); boolean isAutoPurge = "true".equalsIgnoreCase(oldt.getParameters().get("auto.purge")); try { - wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, olddb); + wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, + ReplChangeManager.shouldEnableCm(olddb, oldt)); LOG.info("Deleted the old data location: {} for the table: {}", deleteOldDataLoc, dbname + "." + name); } catch (MetaException ex) { @@ -651,7 +652,7 @@ public Partition alterPartition(RawStore msdb, Warehouse wh, String catName, Str } //rename the data directory - wh.renameDir(srcPath, destPath, ReplChangeManager.isSourceOfReplication(db)); + wh.renameDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl)); LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done."); dataWasMoved = true; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 94698e6771..78691bf03a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -2226,7 +2226,7 @@ private void create_table_core(final RawStore ms, final CreateTableRequest req) if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(tblPath, true, db); + wh.deleteDir(tblPath, true, false, ReplChangeManager.shouldEnableCm(db, tbl)); } } @@ -2804,10 +2804,9 @@ private boolean drop_table_core(final RawStore ms, final String catName, final S } else if (tableDataShouldBeDeleted) { // Data needs deletion. Check if trash may be skipped. // Delete the data in the partitions which have other locations - deletePartitionData(partPaths, ifPurge, db); + deletePartitionData(partPaths, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl)); // Delete the data in the table - deleteTableData(tblPath, ifPurge, db); - // ok even if the data is not deleted + deleteTableData(tblPath, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl)); } if (!listeners.isEmpty()) { @@ -2835,16 +2834,34 @@ private boolean checkTableDataShouldBeDeleted(Table tbl, boolean deleteData) { * @param tablePath * @param ifPurge completely purge the table (skipping trash) while removing * data from warehouse - * @param db database the table belongs to + * @param shouldEnableCm If cm should be enabled */ - private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { + private void deleteTableData(Path tablePath, boolean ifPurge, boolean shouldEnableCm) { + if (tablePath != null) { + try { + wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm); + } catch (MetaException e) { + LOG.error("Failed to delete table directory: " + tablePath + + " " + e.getMessage()); + } + } + } + /** + * Deletes the data in a table's location, if it fails logs an error. + * + * @param tablePath + * @param ifPurge completely purge the table (skipping trash) while removing + * data from warehouse + * @param db Database + */ + private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { if (tablePath != null) { try { wh.deleteDir(tablePath, true, ifPurge, db); - } catch (Exception e) { + } catch (MetaException e) { LOG.error("Failed to delete table directory: " + tablePath + - " " + e.getMessage()); + " " + e.getMessage()); } } } @@ -2856,8 +2873,30 @@ private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { * @param partPaths * @param ifPurge completely purge the partition (skipping trash) while * removing data from warehouse - * @param db database the partition belongs to + * @param shouldEnableCm If cm should be enabled */ + private void deletePartitionData(List partPaths, boolean ifPurge, boolean shouldEnableCm) { + if (partPaths != null && !partPaths.isEmpty()) { + for (Path partPath : partPaths) { + try { + wh.deleteDir(partPath, true, ifPurge, shouldEnableCm); + } catch (Exception e) { + LOG.error("Failed to delete partition directory: " + partPath + + " " + e.getMessage()); + } + } + } + } + + /** + * Give a list of partitions' locations, tries to delete each one + * and for each that fails logs an error. + * + * @param partPaths + * @param ifPurge completely purge the partition (skipping trash) while + * removing data from warehouse + * @param db Database + */ private void deletePartitionData(List partPaths, boolean ifPurge, Database db) { if (partPaths != null && !partPaths.isEmpty()) { for (Path partPath : partPaths) { @@ -2865,7 +2904,7 @@ private void deletePartitionData(List partPaths, boolean ifPurge, Database wh.deleteDir(partPath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete partition directory: " + partPath + - " " + e.getMessage()); + " " + e.getMessage()); } } } @@ -3123,7 +3162,7 @@ private void truncateTableInternal(String dbName, String tableName, List HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getConf(), fs, location); FileStatus targetStatus = fs.getFileStatus(location); String targetGroup = targetStatus == null ? null : targetStatus.getGroup(); - wh.deleteDir(location, true, isAutopurge, db); + wh.deleteDir(location, true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl)); fs.mkdirs(location); HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, location, false); } else { @@ -3132,7 +3171,7 @@ private void truncateTableInternal(String dbName, String tableName, List continue; } for (final FileStatus status : statuses) { - wh.deleteDir(status.getPath(), true, isAutopurge, db); + wh.deleteDir(status.getPath(), true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl)); } } } @@ -3617,7 +3656,7 @@ private Partition append_partition_common(RawStore ms, String catName, String db if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(partLocation, true, db); + wh.deleteDir(partLocation, true, false, ReplChangeManager.shouldEnableCm(db, tbl)); } } @@ -4350,8 +4389,8 @@ private Partition add_partition_core(final RawStore ms, success = ms.addPartition(part); } finally { if (!success && madeDir) { - wh.deleteDir(new Path(part.getSd().getLocation()), true, - ms.getDatabase(tbl.getCatName(), tbl.getDbName())); + wh.deleteDir(new Path(part.getSd().getLocation()), true, false, + ReplChangeManager.shouldEnableCm(ms.getDatabase(part.getCatName(), part.getDbName()), tbl)); } } @@ -4621,7 +4660,7 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam Path archiveParentDir = null; boolean mustPurge = false; boolean tableDataShouldBeDeleted = false; - boolean isSourceOfReplication = false; + boolean needsCm = false; Map transactionalListenerResponses = Collections.emptyMap(); if (db_name == null) { @@ -4669,7 +4708,7 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam new DropPartitionEvent(tbl, part, true, deleteData, this), envContext); } - isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, db_name)); + needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, db_name), tbl); success = ms.commitTransaction(); } } finally { @@ -4688,11 +4727,11 @@ private boolean drop_partition_common(RawStore ms, String catName, String db_nam if (isArchived) { assert (archiveParentDir != null); - wh.deleteDir(archiveParentDir, true, mustPurge, isSourceOfReplication); + wh.deleteDir(archiveParentDir, true, mustPurge, needsCm); } else { assert (partPath != null); - wh.deleteDir(partPath, true, mustPurge, isSourceOfReplication); - deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, isSourceOfReplication); + wh.deleteDir(partPath, true, mustPurge, needsCm); + deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, needsCm); } // ok even if the data is not deleted } @@ -4768,7 +4807,8 @@ public DropPartitionsResult drop_partitions_req( List parts = null; boolean mustPurge = false; List> transactionalListenerResponses = Lists.newArrayList(); - boolean isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, dbName)); + boolean needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, dbName), + ms.getTable(catName, dbName, tblName)); try { // We need Partition-s for firing events and for result; DN needs MPartition-s to drop. @@ -4876,12 +4916,12 @@ public DropPartitionsResult drop_partitions_req( // Archived partitions have har:/to_har_file as their location. // The original directory was saved in params for (Path path : archToDelete) { - wh.deleteDir(path, true, mustPurge, isSourceOfReplication); + wh.deleteDir(path, true, mustPurge, needsCm); } for (PathAndPartValSize p : dirsToDelete) { - wh.deleteDir(p.path, true, mustPurge, isSourceOfReplication); + wh.deleteDir(p.path, true, mustPurge, needsCm); try { - deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, isSourceOfReplication); + deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, needsCm); } catch (IOException ex) { LOG.warn("Error from deleteParentRecursive", ex); throw new MetaException("Failed to delete parent: " + ex.getMessage()); @@ -7776,7 +7816,7 @@ public void drop_function(String dbName, String funcName) if (func == null) { throw new NoSuchObjectException("Function " + funcName + " does not exist"); } - Boolean isSourceOfReplication = + Boolean needsCm = ReplChangeManager.isSourceOfReplication(get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME])); // if copy of jar to change management fails we fail the metastore transaction, since the @@ -7784,7 +7824,7 @@ public void drop_function(String dbName, String funcName) // a copy is required to allow incremental replication to work correctly. if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) { for (ResourceUri uri : func.getResourceUris()) { - if (uri.getUri().toLowerCase().startsWith("hdfs:") && isSourceOfReplication) { + if (uri.getUri().toLowerCase().startsWith("hdfs:") && needsCm) { wh.addToChangeManagement(new Path(uri.getUri())); } }