diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index af0884c2d3..df9a5a0fb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * A class to clean directories after compactions. This will run in a separate thread. @@ -55,12 +57,18 @@ public class Cleaner extends CompactorThread { static final private String CLASS_NAME = Cleaner.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private long cleanerCheckInterval = 0; + private ReplChangeManager replChangeManager; // List of compactions to clean. - private Map> compactId2LockMap = new HashMap>(); - private Map compactId2CompactInfoMap = new HashMap(); + private Map> compactId2LockMap = new HashMap<>(); + private Map compactId2CompactInfoMap = new HashMap<>(); + + @Override + public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException { + super.init(stop, looped); + replChangeManager = ReplChangeManager.getInstance(conf); + } @Override public void run() { @@ -245,10 +253,10 @@ private void clean(CompactionInfo ci) throws MetaException { * Each Compaction only compacts as far as the highest txn id such that all txns below it * are resolved (i.e. not opened). This is what "highestWriteId" tracks. This is only tracked * since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorWriteIdList and uses for more info. - * + * * We only want to clean up to the highestWriteId - otherwise we risk deleting deltas from * under an active reader. - * + * * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a * clean request for D2. * Cleaner checks existing locks and finds none. @@ -258,8 +266,9 @@ private void clean(CompactionInfo ci) throws MetaException { * unless ValidTxnList is "capped" at highestWriteId. */ final ValidWriteIdList txnList = (ci.highestWriteId > 0) - ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId) - : new ValidReaderWriteIdList(); + ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), + ci.highestWriteId) + : new ValidReaderWriteIdList(); if (runJobAsSelf(ci.runAs)) { removeFiles(location, txnList); @@ -306,8 +315,8 @@ private void removeFiles(String location, ValidWriteIdList writeIdList) throws I for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); + replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); fs.delete(dead, true); } } - } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 95fa0a987c..7c1d5f5cca 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -163,104 +163,99 @@ public boolean accept(Path p){ * @param ifPurge if the file should skip Trash when move/delete source file. * This is referred only if type is MOVE. * @return int - * @throws MetaException + * @throws IOException */ - int recycle(Path path, RecycleType type, boolean ifPurge) throws MetaException { + public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOException { if (!enabled) { return 0; } - try { - int count = 0; - - if (fs.isDirectory(path)) { - FileStatus[] files = fs.listStatus(path, hiddenFileFilter); - for (FileStatus file : files) { - count += recycle(file.getPath(), type, ifPurge); - } + int count = 0; + if (fs.isDirectory(path)) { + FileStatus[] files = fs.listStatus(path, hiddenFileFilter); + for (FileStatus file : files) { + count += recycle(file.getPath(), type, ifPurge); + } + } else { + String fileCheckSum = checksumFor(path, fs); + Path cmPath = getCMPath(conf, path.getName(), fileCheckSum); + + // set timestamp before moving to cmroot, so we can + // avoid race condition CM remove the file before setting + // timestamp + long now = System.currentTimeMillis(); + fs.setTimes(path, now, -1); + + boolean success = false; + if (fs.exists(cmPath) && fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) { + // If already a file with same checksum exists in cmPath, just ignore the copy/move + // Also, mark the operation is unsuccessful to notify that file with same name already + // exist which will ensure the timestamp of cmPath is updated to avoid clean-up by + // CM cleaner. + success = false; } else { - String fileCheckSum = checksumFor(path, fs); - Path cmPath = getCMPath(conf, path.getName(), fileCheckSum); - - // set timestamp before moving to cmroot, so we can - // avoid race condition CM remove the file before setting - // timestamp - long now = System.currentTimeMillis(); - fs.setTimes(path, now, -1); - - boolean success = false; - if (fs.exists(cmPath) && fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) { - // If already a file with same checksum exists in cmPath, just ignore the copy/move - // Also, mark the operation is unsuccessful to notify that file with same name already - // exist which will ensure the timestamp of cmPath is updated to avoid clean-up by - // CM cleaner. - success = false; - } else { - switch (type) { - case MOVE: { - if (LOG.isDebugEnabled()) { - LOG.debug("Moving {} to {}", path.toString(), cmPath.toString()); - } - // Rename fails if the file with same name already exist. - success = fs.rename(path, cmPath); - break; - } - case COPY: { - if (LOG.isDebugEnabled()) { - LOG.debug("Copying {} to {}", path.toString(), cmPath.toString()); - } - // It is possible to have a file with same checksum in cmPath but the content is - // partially copied or corrupted. In this case, just overwrite the existing file with - // new one. - success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf); - break; - } - default: - // Operation fails as invalid input - break; + switch (type) { + case MOVE: { + if (LOG.isDebugEnabled()) { + LOG.debug("Moving {} to {}", path.toString(), cmPath.toString()); } + // Rename fails if the file with same name already exist. + success = fs.rename(path, cmPath); + break; } - - // Ignore if a file with same content already exist in cmroot - // We might want to setXAttr for the new location in the future - if (success) { - // set the file owner to hive (or the id metastore run as) - fs.setOwner(cmPath, msUser, msGroup); - - // tag the original file name so we know where the file comes from - // Note we currently only track the last known trace as - // xattr has limited capacity. We shall revisit and store all original - // locations if orig-loc becomes important - try { - fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); - } catch (UnsupportedOperationException e) { - LOG.warn("Error setting xattr for {}", path.toString()); - } - - count++; - } else { + case COPY: { if (LOG.isDebugEnabled()) { - LOG.debug("A file with the same content of {} already exists, ignore", path.toString()); + LOG.debug("Copying {} to {}", path.toString(), cmPath.toString()); } - // Need to extend the tenancy if we saw a newer file with the same content - fs.setTimes(cmPath, now, -1); + // It is possible to have a file with same checksum in cmPath but the content is + // partially copied or corrupted. In this case, just overwrite the existing file with + // new one. + success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf); + break; } + default: + // Operation fails as invalid input + break; + } + } - // Tag if we want to remain in trash after deletion. - // If multiple files share the same content, then - // any file claim remain in trash would be granted - if ((type == RecycleType.MOVE) && !ifPurge) { - try { - fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0}); - } catch (UnsupportedOperationException e) { - LOG.warn("Error setting xattr for {}", cmPath.toString()); - } + // Ignore if a file with same content already exist in cmroot + // We might want to setXAttr for the new location in the future + if (success) { + // set the file owner to hive (or the id metastore run as) + fs.setOwner(cmPath, msUser, msGroup); + + // tag the original file name so we know where the file comes from + // Note we currently only track the last known trace as + // xattr has limited capacity. We shall revisit and store all original + // locations if orig-loc becomes important + try { + fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for {}", path.toString()); + } + + count++; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("A file with the same content of {} already exists, ignore", path.toString()); + } + // Need to extend the tenancy if we saw a newer file with the same content + fs.setTimes(cmPath, now, -1); + } + + // Tag if we want to remain in trash after deletion. + // If multiple files share the same content, then + // any file claim remain in trash would be granted + if ((type == RecycleType.MOVE) && !ifPurge) { + try { + fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[] { 0 }); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for {}", cmPath.toString()); } } - return count; - } catch (IOException e) { - throw new MetaException(StringUtils.stringifyException(e)); } + return count; } // Get checksum of a file @@ -289,7 +284,7 @@ static public void setCmRoot(Path cmRoot) { * @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)} * @return Path */ - static Path getCMPath(Configuration conf, String name, String checkSum) throws IOException, MetaException { + static Path getCMPath(Configuration conf, String name, String checkSum) { String newFileName = name + "_" + checkSum; int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index 20c10607bb..bf6a51c841 100755 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -220,7 +220,11 @@ public boolean renameDir(Path sourcePath, Path destPath, boolean needCmRecycle) } void addToChangeManagement(Path file) throws MetaException { - cm.recycle(file, RecycleType.COPY, true); + try { + cm.recycle(file, RecycleType.COPY, true); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } } public boolean deleteDir(Path f, boolean recursive) throws MetaException { @@ -234,15 +238,23 @@ public boolean deleteDir(Path f, boolean recursive, boolean ifPurge) throws Meta public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean needCmRecycle) throws MetaException { // no need to create the CM recycle file for temporary tables if (needCmRecycle) { - cm.recycle(f, RecycleType.MOVE, ifPurge); + + try { + cm.recycle(f, RecycleType.MOVE, ifPurge); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } } FileSystem fs = getFs(f); return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf); } public void recycleDirToCmPath(Path f, boolean ifPurge) throws MetaException { - cm.recycle(f, RecycleType.MOVE, ifPurge); - return; + try { + cm.recycle(f, RecycleType.MOVE, ifPurge); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } } public boolean isEmpty(Path path) throws IOException, MetaException {