diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 24c5db0e47..e9480a9168 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -449,6 +449,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "lead to an task increment that would cross the above limit"), REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",5, "Number of threads that will be used to dump partition data information during repl dump."), + REPL_DUMPDIR_CLEAN_FREQ("hive.repl.dumpdir.clean.freq", "0s", + new TimeValidator(TimeUnit.SECONDS), + "Frequency at which timer task runs to purge expired dump dirs."), + REPL_DUMPDIR_TTL("hive.repl.dumpdir.ttl", "7d", + new TimeValidator(TimeUnit.DAYS), + "TTL of dump dirs before cleanup."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 5812a1bf4f..9564e39c22 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -127,6 +127,7 @@ import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.metrics.PerfLogger; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask; import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager; import org.apache.hadoop.hive.metastore.security.TUGIContainingTransport; @@ -538,6 +539,12 @@ public void init() throws MetaException { cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq); } + cleanFreq = hiveConf.getTimeVar(ConfVars.REPL_DUMPDIR_CLEAN_FREQ, TimeUnit.MILLISECONDS); + if (cleanFreq > 0) { + // In default config, there is no timer. + Timer cleaner = new Timer("Repl Dump Dir Cleaner Thread", true); + cleaner.schedule(new DumpDirCleanerTask(hiveConf), cleanFreq, cleanFreq); + } expressionProxy = PartFilterExprUtil.createExpressionProxy(hiveConf); fileMetadataManager = new FileMetadataManager((ThreadLocalRawStore)this, hiveConf); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/repl/DumpDirCleanerTask.java b/metastore/src/java/org/apache/hadoop/hive/metastore/repl/DumpDirCleanerTask.java new file mode 100644 index 0000000000..3374789385 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/repl/DumpDirCleanerTask.java @@ -0,0 +1,45 @@ +package org.apache.hadoop.hive.metastore.repl; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +public class DumpDirCleanerTask extends TimerTask { + public static final Logger LOG = LoggerFactory.getLogger(DumpDirCleanerTask.class); + private final HiveConf conf; + private final Path dumpRoot; + private final long ttl; + + public DumpDirCleanerTask(HiveConf conf) { + this.conf = conf; + dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR)); + ttl = conf.getTimeVar(ConfVars.REPL_DUMPDIR_TTL, TimeUnit.MILLISECONDS); + } + + @Override + public void run() { + LOG.debug("Trying to delete old dump dirs"); + try { + FileSystem fs = FileSystem.get(dumpRoot.toUri(), conf); + FileStatus[] statuses = fs.listStatus(dumpRoot); + for (FileStatus status : statuses) + { + if (status.getModificationTime() < System.currentTimeMillis() - ttl) + { + fs.delete(status.getPath(), true); + LOG.info("Deleted old dump dir: " + status.getPath()); + } + } + } catch (IOException e) { + LOG.error("Error while trying to delete dump dir", e); + } + } +}