diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 021874e..98268f9 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -218,7 +218,8 @@ public class HiveConf extends Configuration { METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", ""), // should we do checks against the storage (usually hdfs) for operations like drop_partition METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks", false), - + METASTORE_EVENT_CLEAN_FREQ("metastore.event.clean.freq",60 * 6 * 60L), + METASTORE_EVENT_EXPIRY_DURATION("metastore.event.expiry.duration",60 * 24 * 7 * 60L), // Default parameters for creating tables NEWTABLEDEFAULTPARA("hive.table.parameters.default",""), diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 783830f..6c999d3 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.events.CreateTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.EventCleanerThread; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook; import org.apache.hadoop.hive.metastore.model.MDBPrivilege; @@ -255,6 +256,9 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } listeners = MetaStoreUtils.getMetaStoreListener(hiveConf); + Thread cleanerTh = new EventCleanerThread(this); + cleanerTh.setDaemon(true); + cleanerTh.start(); return true; } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 6e1c5e1..5b4137a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; @@ -3473,4 +3474,27 @@ public class ObjectStore implements RawStore, Configurable { } return join(storedVals,','); } + + @Override + public long cleanupEvents() { + boolean commited = false; + long delCnt; + LOG.debug("Begin executing cleanupEvents"); + Long expiryTime = HiveConf.getLongVar(getConf(), ConfVars.METASTORE_EVENT_EXPIRY_DURATION); + Long curTime = System.currentTimeMillis(); + try { + openTransaction(); + Query query = pm.newQuery(MPartitionEvent.class,"curTime - eventTime > expiryTime"); + query.declareParameters("java.lang.Long curTime, java.lang.Long expiryTime"); + delCnt = query.deletePersistentAll(curTime, expiryTime); + commited = commitTransaction(); + } + finally { + if (!commited) { + rollbackTransaction(); + } + LOG.debug("Done executing cleanupEvents"); + } + return delCnt; + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index e57d8ea..d820e48 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -243,4 +243,5 @@ public interface RawStore extends Configurable { String tblName, short maxParts, String userName, List groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException;; + public abstract long cleanupEvents(); } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java index 78dd610..6b63591 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java @@ -25,6 +25,7 @@ import junit.framework.TestCase; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -46,6 +47,8 @@ public class TestMarkPartition extends TestCase{ protected void setUp() throws Exception { super.setUp(); + System.setProperty(ConfVars.METASTORE_EVENT_CLEAN_FREQ.varname, "2"); + System.setProperty(ConfVars.METASTORE_EVENT_EXPIRY_DURATION.varname, "5"); hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); @@ -56,7 +59,7 @@ public class TestMarkPartition extends TestCase{ public void testMarkingPartitionSet() throws CommandNeedRetryException, MetaException, TException, NoSuchObjectException, UnknownDBException, UnknownTableException, - InvalidPartitionException, UnknownPartitionException { + InvalidPartitionException, UnknownPartitionException, InterruptedException { HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf, null); driver = new Driver(hiveConf); driver.run("drop database if exists tmpdb cascade"); @@ -69,6 +72,8 @@ public class TestMarkPartition extends TestCase{ kvs.put("b", "'2011'"); msc.markPartitionForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE); assert msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE); + Thread.sleep(5000); + assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE); kvs.put("b", "'2012'"); assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerThread.java metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerThread.java new file mode 100644 index 0000000..5bce9b4 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerThread.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.hive.metastore.events; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command; +import org.apache.hadoop.hive.metastore.RawStore; + +public class EventCleanerThread extends Thread{ + + public static final Log LOG = LogFactory.getLog(EventCleanerThread.class); + private final HMSHandler handler; + private final long timeBetweenRuns; + + /** + * @param timeBetweenRuns in seconds + * @param expiryTime in seconds + */ + public EventCleanerThread(HMSHandler handler) { + super(); + this.handler = handler; + // scale it to milliseconds + this.timeBetweenRuns = handler.getHiveConf().getLongVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ) * 1000L; + } + + @Override + public void run() { + + while(true){ + + try { + // Sleep for 6 hrs before next run. + Thread.sleep(timeBetweenRuns); + long deleteCnt = handler.executeWithRetry(new Command(){ + @Override + public Long run(RawStore ms) throws Exception { + return ms.cleanupEvents(); + } + }); + if (deleteCnt > 0L){ + LOG.info("Number of events deleted from event Table: "+deleteCnt); + } + } catch (Exception e) { + LOG.error(e); + } + } + } +}