diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 16f6c1c..f011127 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -439,7 +439,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPLDIR("hive.repl.rootdir","/user/hive/repl/", "HDFS root dir for all replication dumps."), REPLCMENABLED("hive.repl.cm.enabled", false, - "Turn on ChangeManager, so delete files will goes to cmrootdir."), + "Turn on ChangeManager, so delete files will go to cmrootdir."), REPLCMDIR("hive.repl.cmrootdir","/user/hive/cmroot/", "Root dir for ChangeManager, used for deleted files."), REPLCMRETIAN("hive.repl.cm.retain","24h", diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java index 19d3125..1011d34 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java @@ -25,13 +25,13 @@ import java.util.Collections; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.junit.Test; public class TestWarehousePartitionHelper { - private static final Configuration CONFIGURATION = new Configuration(); + private static final HiveConf CONFIGURATION = new HiveConf(); private static final Path TABLE_PATH = new Path("table"); private static final List UNPARTITIONED_COLUMNS = Collections.emptyList(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index 0587221..205c640 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -62,6 +63,7 @@ public static void setUp() throws Exception { hiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); cmroot = "hdfs://" + m_dfs.getNameNode().getHostAndPort() + "/cmroot"; hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot); + hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); warehouse = new Warehouse(hiveConf); try { client = new HiveMetaStoreClient(hiveConf); @@ -104,7 +106,8 @@ public void testRecyclePartTable() throws Exception { // Create db1/t1/dt=20160101/part // /dt=20160102/part // /dt=20160103/part - // Test: recycle single partition (dt=20160101) + // Test: recycle single file (dt=20160101/part) + // recycle single partition (dt=20160102) // recycle table t1 String dbName = "db1"; client.dropDatabase(dbName, true, true); @@ -148,44 +151,37 @@ public void testRecyclePartTable() throws Exception { Path part1Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160101")), "part"); createFile(part1Path, "p1"); - String path1Sig = ReplChangeManager.getCksumString(part1Path, hiveConf); + String path1Chksum = ReplChangeManager.getCksumString(part1Path, hiveConf); Path part2Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160102")), "part"); createFile(part2Path, "p2"); - String path2Sig = ReplChangeManager.getCksumString(part2Path, hiveConf); + String path2Chksum = ReplChangeManager.getCksumString(part2Path, hiveConf); Path part3Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160103")), "part"); createFile(part3Path, "p3"); - String path3Sig = ReplChangeManager.getCksumString(part3Path, hiveConf); + String path3Chksum = ReplChangeManager.getCksumString(part3Path, hiveConf); Assert.assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path)); Assert.assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path)); Assert.assertTrue(part3Path.getFileSystem(hiveConf).exists(part3Path)); - ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf, warehouse); + ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf); // verify cm.recycle(db, table, part) api moves file to cmroot dir - int ret = cm.recycle(db, tbl, part1); - Assert.assertEquals(ret, 0); - ret = cm.recycle(db, tbl, part2); - Assert.assertEquals(ret, 0); - ret = cm.recycle(db, tbl, part3); - Assert.assertEquals(ret, 0); - - Assert.assertFalse(part1Path.getFileSystem(hiveConf).exists(part1Path)); - Assert.assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path)); - Assert.assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path)); - - client.dropPartition(dbName, tblName, Arrays.asList("20160101")); - - Path cmPart1Path = ReplChangeManager.getCMPath(part1Path, hiveConf, path1Sig); + int ret = cm.recycle(part1Path, false); + Assert.assertEquals(ret, 1); + Path cmPart1Path = ReplChangeManager.getCMPath(part1Path, hiveConf, path1Chksum); Assert.assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path)); - client.dropTable(dbName, tblName); - - Path cmPart2Path = ReplChangeManager.getCMPath(part2Path, hiveConf, path2Sig); + // Verify dropPartition recycle part files + client.dropPartition(dbName, tblName, Arrays.asList("20160102")); + Assert.assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path)); + Path cmPart2Path = ReplChangeManager.getCMPath(part2Path, hiveConf, path2Chksum); Assert.assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path)); - Path cmPart3Path = ReplChangeManager.getCMPath(part3Path, hiveConf, path3Sig); + // Verify dropTable recycle partition files + client.dropTable(dbName, tblName); + Assert.assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path)); + Path cmPart3Path = ReplChangeManager.getCMPath(part3Path, hiveConf, path3Chksum); Assert.assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path)); client.dropDatabase(dbName, true, true); @@ -225,42 +221,37 @@ public void testRecycleNonPartTable() throws Exception { Path filePath1 = new Path(warehouse.getTablePath(db, tblName), "part1"); createFile(filePath1, "f1"); - String fileSig1 = ReplChangeManager.getCksumString(filePath1, hiveConf); + String fileChksum1 = ReplChangeManager.getCksumString(filePath1, hiveConf); Path filePath2 = new Path(warehouse.getTablePath(db, tblName), "part2"); createFile(filePath2, "f2"); - String fileSig2 = ReplChangeManager.getCksumString(filePath2, hiveConf); + String fileChksum2 = ReplChangeManager.getCksumString(filePath2, hiveConf); Path filePath3 = new Path(warehouse.getTablePath(db, tblName), "part3"); createFile(filePath3, "f3"); - String fileSig3 = ReplChangeManager.getCksumString(filePath3, hiveConf); - + String fileChksum3 = ReplChangeManager.getCksumString(filePath3, hiveConf); Assert.assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1)); Assert.assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2)); Assert.assertTrue(filePath3.getFileSystem(hiveConf).exists(filePath3)); - ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf, warehouse); + ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf); // verify cm.recycle(Path) api moves file to cmroot dir - cm.recycle(filePath1); + cm.recycle(filePath1, false); Assert.assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1)); - Path cmPath1 = ReplChangeManager.getCMPath(filePath1, hiveConf, fileSig1); + Path cmPath1 = ReplChangeManager.getCMPath(filePath1, hiveConf, fileChksum1); Assert.assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1)); - // verify cm.recycle(db, table) api moves file to cmroot dir - int ret = cm.recycle(db, tbl); - Assert.assertEquals(ret, 0); - - Assert.assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2)); - Assert.assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3)); - + // Verify dropTable recycle table files client.dropTable(dbName, tblName); - Path cmPath2 = ReplChangeManager.getCMPath(filePath2, hiveConf, fileSig2); + Path cmPath2 = ReplChangeManager.getCMPath(filePath2, hiveConf, fileChksum2); + Assert.assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2)); Assert.assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2)); - Path cmPath3 = ReplChangeManager.getCMPath(filePath3, hiveConf, fileSig3); + Path cmPath3 = ReplChangeManager.getCMPath(filePath3, hiveConf, fileChksum3); + Assert.assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3)); Assert.assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3)); client.dropDatabase(dbName, true, true); @@ -268,32 +259,50 @@ public void testRecycleNonPartTable() throws Exception { @Test public void testClearer() throws Exception { - FileSystem fs = new Path(cmroot).getFileSystem(hiveConf); + FileSystem fs = warehouse.getWhRoot().getFileSystem(hiveConf); long now = System.currentTimeMillis(); - Path dirDb = new Path(cmroot, "db3"); + Path dirDb = new Path(warehouse.getWhRoot(), "db3"); fs.mkdirs(dirDb); Path dirTbl1 = new Path(dirDb, "tbl1"); fs.mkdirs(dirTbl1); Path part11 = new Path(dirTbl1, "part1"); - fs.create(part11).close(); - fs.setTimes(part11, now - 86400*1000*2, now - 86400*1000*2); + createFile(part11, "testClearer11"); + String fileChksum11 = ReplChangeManager.getCksumString(part11, hiveConf); Path part12 = new Path(dirTbl1, "part2"); - fs.create(part12).close(); + createFile(part12, "testClearer12"); + String fileChksum12 = ReplChangeManager.getCksumString(part12, hiveConf); Path dirTbl2 = new Path(dirDb, "tbl2"); fs.mkdirs(dirTbl2); Path part21 = new Path(dirTbl2, "part1"); - fs.create(part21).close(); - fs.setTimes(part21, now - 86400*1000*2, now - 86400*1000*2); + createFile(part21, "testClearer21"); + String fileChksum21 = ReplChangeManager.getCksumString(part21, hiveConf); Path part22 = new Path(dirTbl2, "part2"); - fs.create(part22).close(); + createFile(part22, "testClearer22"); + String fileChksum22 = ReplChangeManager.getCksumString(part22, hiveConf); Path dirTbl3 = new Path(dirDb, "tbl3"); fs.mkdirs(dirTbl3); Path part31 = new Path(dirTbl3, "part1"); - fs.create(part31).close(); - fs.setTimes(part31, now - 86400*1000*2, now - 86400*1000*2); + createFile(part31, "testClearer31"); + String fileChksum31 = ReplChangeManager.getCksumString(part31, hiveConf); Path part32 = new Path(dirTbl3, "part2"); - fs.create(part32).close(); - fs.setTimes(part32, now - 86400*1000*2, now - 86400*1000*2); + createFile(part32, "testClearer32"); + String fileChksum32 = ReplChangeManager.getCksumString(part32, hiveConf); + + ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, false); + ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, false); + ReplChangeManager.getInstance(hiveConf).recycle(dirTbl3, true); + + Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part11, hiveConf, fileChksum11))); + Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part12, hiveConf, fileChksum12))); + Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part21, hiveConf, fileChksum21))); + Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part22, hiveConf, fileChksum22))); + Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31))); + Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part32, hiveConf, fileChksum32))); + + fs.setTimes(ReplChangeManager.getCMPath(part11, hiveConf, fileChksum11), now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(part21, hiveConf, fileChksum21), now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31), now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(part32, hiveConf, fileChksum32), now - 86400*1000*2, now - 86400*1000*2); ReplChangeManager.scheduleCMClearer(hiveConf); @@ -306,11 +315,12 @@ public void testClearer() throws Exception { if (end - start > 5000) { Assert.fail("timeout, cmroot has not been cleared"); } - if (!part11.getFileSystem(hiveConf).exists(part11) && - part12.getFileSystem(hiveConf).exists(part12) && - !part21.getFileSystem(hiveConf).exists(part21) && - part22.getFileSystem(hiveConf).exists(part22) && - !dirTbl3.getFileSystem(hiveConf).exists(dirTbl3)) { + if (!fs.exists(ReplChangeManager.getCMPath(part11, hiveConf, fileChksum11)) && + fs.exists(ReplChangeManager.getCMPath(part12, hiveConf, fileChksum12)) && + !fs.exists(ReplChangeManager.getCMPath(part21, hiveConf, fileChksum21)) && + fs.exists(ReplChangeManager.getCMPath(part22, hiveConf, fileChksum22)) && + !fs.exists(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31)) && + !fs.exists(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31))) { cleared = true; } } while (!cleared); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index a49a80e..99cba9d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; -import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -30,14 +29,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -50,129 +48,124 @@ private static boolean inited = false; private static boolean enabled = false; private static Path cmroot; - private static HiveConf conf; - private static Warehouse wh; - private String user; - private String group; + private static HiveConf hiveConf; + private String msUser; + private String msGroup; + private FileSystem fs; - public static ReplChangeManager getInstance(HiveConf conf, Warehouse wh) throws IOException { + public static final String ORIG_LOC_TAG = "user.original-loc"; + public static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; + + public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaException { if (instance == null) { - instance = new ReplChangeManager(conf, wh); + instance = new ReplChangeManager(hiveConf); } return instance; } - ReplChangeManager(HiveConf conf, Warehouse wh) throws IOException { - if (!inited) { - if (conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) { - ReplChangeManager.enabled = true; - ReplChangeManager.cmroot = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname)); - ReplChangeManager.conf = conf; - ReplChangeManager.wh = wh; + ReplChangeManager(HiveConf hiveConf) throws MetaException { + try { + if (!inited) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) { + ReplChangeManager.enabled = true; + ReplChangeManager.cmroot = new Path(hiveConf.get(HiveConf.ConfVars.REPLCMDIR.varname)); + ReplChangeManager.hiveConf = hiveConf; - FileSystem fs = cmroot.getFileSystem(conf); - // Create cmroot with permission 700 if not exist - if (!fs.exists(cmroot)) { - fs.mkdirs(cmroot); - fs.setPermission(cmroot, new FsPermission("700")); + fs = cmroot.getFileSystem(hiveConf); + // Create cmroot with permission 700 if not exist + if (!fs.exists(cmroot)) { + fs.mkdirs(cmroot); + fs.setPermission(cmroot, new FsPermission("700")); + } + UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); + msUser = usergroupInfo.getShortUserName(); + msGroup = usergroupInfo.getPrimaryGroupName(); } - UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); - user = usergroupInfo.getShortUserName(); - group = usergroupInfo.getPrimaryGroupName(); + inited = true; } - inited = true; + } catch (IOException e) { + throw new MetaException(StringUtils.stringifyException(e)); } } - /*** - * Recycle a managed table, move table files to cmroot - * @param db - * @param table - * @return - * @throws IOException - * @throws MetaException - */ - public int recycle(Database db, Table table) throws IOException, MetaException { - if (!enabled) { - return 0; + // Filter files starts with ".". Note Hadoop consider files starts with + // "." or "_" as hidden file. However, we need to replicate files starts + // with "_". We find at least 2 use cases: + // 1. For har files, _index and _masterindex is required files + // 2. _success file is required for Oozie to indicate availability of data source + private static final PathFilter hiddenFileFilter = new PathFilter(){ + public boolean accept(Path p){ + return !p.getName().startsWith("."); } - - Path tablePath = wh.getTablePath(db, table.getTableName()); - FileSystem fs = tablePath.getFileSystem(conf); - int failCount = 0; - for (FileStatus file : fs.listStatus(tablePath)) { - if (!recycle(file.getPath())) { - failCount++; - } - } - return failCount; - } + }; /*** - * Recycle a partition of a managed table, move partition files to cmroot - * @param db - * @param table - * @param part + * Move a path into cmroot. If the path is a directory (of a partition, or table if nonpartitioned), + * recursively move files inside directory to cmroot. Note the table must be managed table + * @param path a single file or directory + * @param ifPurge if the file should skip Trash when delete * @return - * @throws IOException * @throws MetaException */ - public int recycle(Database db, Table table, Partition part) throws IOException, MetaException { + public int recycle(Path path, boolean ifPurge) throws MetaException { if (!enabled) { return 0; } - Map pm = Warehouse.makeSpecFromValues(table.getPartitionKeys(), part.getValues()); - Path partPath = wh.getPartitionPath(db, table.getTableName(), pm); - FileSystem fs = partPath.getFileSystem(conf); - int failCount = 0; - for (FileStatus file : fs.listStatus(partPath)) { - if (!recycle(file.getPath())) { - failCount++; - } - } - return failCount; - } + try { + int count = 0; - /*** - * Recycle a single file (of a partition, or table if nonpartitioned), - * move files to cmroot. Note the table must be managed table - * @param path - * @return - * @throws IOException - * @throws MetaException - */ - public boolean recycle(Path path) throws IOException, MetaException { - if (!enabled) { - return true; - } + if (fs.isDirectory(path)) { + FileStatus[] files = fs.listStatus(path, hiddenFileFilter); + for (FileStatus file : files) { + count += recycle(file.getPath(), ifPurge); + } + } else { + Path cmPath = getCMPath(path, hiveConf, getCksumString(path, hiveConf)); - Path cmPath = getCMPath(path, conf, getCksumString(path, conf)); + if (LOG.isDebugEnabled()) { + LOG.debug("Moving " + path.toString() + " to " + cmPath.toString()); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Moving " + path.toString() + " to " + cmPath.toString()); - } + // set timestamp before moving to cmroot, so we can + // avoid race condition CM remove the file before setting + // timestamp + long now = System.currentTimeMillis(); + fs.setTimes(path, now, now); - FileSystem fs = path.getFileSystem(conf); + boolean succ = fs.rename(path, cmPath); + // Ignore if a file with same content already exist in cmroot + // We might want to setXAttr for the new location in the future + if (!succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("A file with the same content of " + path.toString() + " already exists, ignore"); + } + // Need to extend the tenancy if we saw a newer file with the same content + fs.setTimes(cmPath, now, now); + } else { - boolean succ = fs.rename(path, cmPath); - // Ignore if a file with same content already exist in cmroot - // We might want to setXAttr for the new location in the future - if (!succ) { - if (LOG.isDebugEnabled()) { - LOG.debug("A file with the same content of " + path.toString() + " already exists, ignore"); - } - } else { - long now = System.currentTimeMillis(); - fs.setTimes(cmPath, now, now); + // set the file owner to hive (or the id metastore run as) + fs.setOwner(cmPath, msUser, msGroup); - // set the file owner to hive (or the id metastore run as) - fs.setOwner(cmPath, user, group); + // tag the original file name so we know where the file comes from + // Note we currently only track the last known trace as + // xattr has limited capacity. We shall revisit and store all original + // locations if orig-loc becomes important + fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); - // tag the original file name so we know where the file comes from - fs.setXAttr(cmPath, "user.original-loc", path.toString().getBytes()); + count++; + } + // Tag if we want to remain in trash after deletion. + // If multiple files share the same content, then + // any file claim remain in trash would be granted + if (!ifPurge) { + fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0}); + } + } + return count; + } catch (IOException e) { + throw new MetaException(StringUtils.stringifyException(e)); } - return succ; } // Get checksum of a file @@ -189,17 +182,17 @@ static public String getCksumString(Path path, Configuration conf) throws IOExce /*** * Convert a path of file inside a partition or table (if non-partitioned) * to a deterministic location of cmroot. So user can retrieve the file back - * with the original location plus signature. + * with the original location plus checksum. * @param path original path inside partition or table * @param conf - * @param signature unique signature of the file, can be retrieved by {@link getSignature} + * @param chksum checksum of the file, can be retrieved by {@link getCksumString} * @return * @throws IOException * @throws MetaException */ - static public Path getCMPath(Path path, Configuration conf, String signature) + static public Path getCMPath(Path path, Configuration conf, String chksum) throws IOException, MetaException { - String newFileName = signature + path.getName(); + String newFileName = chksum; int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); @@ -218,64 +211,64 @@ static public Path getCMPath(Path path, Configuration conf, String signature) static class CMClearer implements Runnable { private Path cmroot; private long secRetain; - private Configuration conf; + private HiveConf hiveConf; - CMClearer(String cmrootString, long secRetain, Configuration conf) { + CMClearer(String cmrootString, long secRetain, HiveConf hiveConf) { this.cmroot = new Path(cmrootString); this.secRetain = secRetain; - this.conf = conf; + this.hiveConf = hiveConf; } @Override public void run() { try { LOG.info("CMClearer started"); + long now = System.currentTimeMillis(); - processDir(cmroot, now); - } catch (IOException e) { - LOG.error("Exception when clearing cmroot:" + StringUtils.stringifyException(e)); - } - } + FileSystem fs = cmroot.getFileSystem(hiveConf); + FileStatus[] files = fs.listStatus(cmroot); - private boolean processDir(Path folder, long now) throws IOException { - FileStatus[] files = folder.getFileSystem(conf).listStatus(folder); - boolean empty = true; - for (FileStatus file : files) { - if (file.isDirectory()) { - if (processDir(file.getPath(), now)) { - file.getPath().getFileSystem(conf).delete(file.getPath(), false); - if (LOG.isDebugEnabled()) { - LOG.debug("Remove " + file.toString()); - } - } else { - empty = false; - } - } else { + for (FileStatus file : files) { long modifiedTime = file.getModificationTime(); if (now - modifiedTime > secRetain*1000) { - file.getPath().getFileSystem(conf).delete(file.getPath(), false); - } else { - empty = false; + if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { + boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), hiveConf); + if (succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("Move " + file.toString() + " to trash"); + } + } else { + LOG.warn("Fail to move " + file.toString() + " to trash"); + } + } else { + boolean succ = fs.delete(file.getPath(), false); + if (succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove " + file.toString()); + } + } else { + LOG.warn("Fail to remove " + file.toString()); + } + } } } + } catch (IOException e) { + LOG.error("Exception when clearing cmroot:" + StringUtils.stringifyException(e)); } - return empty; } } // Schedule CMClearer thread. Will be invoked by metastore public static void scheduleCMClearer(HiveConf hiveConf) { - if (hiveConf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) { + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.REPLCMENABLED)) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( new BasicThreadFactory.Builder() .namingPattern("cmclearer-%d") .daemon(true) .build()); executor.scheduleAtFixedRate(new CMClearer(hiveConf.get(HiveConf.ConfVars.REPLCMDIR.varname), - HiveConf.getTimeVar(hiveConf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), hiveConf), - 0, - HiveConf.getTimeVar(hiveConf, ConfVars.REPLCMINTERVAL, - TimeUnit.SECONDS), TimeUnit.SECONDS); + hiveConf.getTimeVar(ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), hiveConf), + 0, hiveConf.getTimeVar(ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); } } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java index 549ca52..a65a2e7 100755 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -66,6 +66,7 @@ private MetaStoreFS fsHandler = null; private boolean storageAuthCheck = false; + private ReplChangeManager cm = null; public Warehouse(Configuration conf) throws MetaException { this.conf = conf; @@ -75,6 +76,7 @@ public Warehouse(Configuration conf) throws MetaException { + " is not set in the config or blank"); } fsHandler = getMetaStoreFsHandler(conf); + cm = ReplChangeManager.getInstance((HiveConf)conf); storageAuthCheck = HiveConf.getBoolVar(conf, HiveConf.ConfVars.METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS); } @@ -213,6 +215,7 @@ public boolean deleteDir(Path f, boolean recursive) throws MetaException { } public boolean deleteDir(Path f, boolean recursive, boolean ifPurge) throws MetaException { + cm.recycle(f, ifPurge); FileSystem fs = getFs(f); return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf); }