Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1138719) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -218,7 +218,8 @@ METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", ""), // should we do checks against the storage (usually hdfs) for operations like drop_partition METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks", false), - + METASTORE_EVENT_CLEAN_FREQ("hive.metastore.event.clean.freq",0L), + METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration",0L), // Default parameters for creating tables NEWTABLEDEFAULTPARA("hive.table.parameters.default",""), Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1138719) +++ conf/hive-default.xml (working copy) @@ -233,6 +233,18 @@ + hive.metastore.event.expiry.duration + 0L + Duration after which events expire from events table (in seconds) + + + + hive.metastore.event.clean.freq + 0L + Frequency at which timer task runs to purge expired events in metastore(in seconds). + + + hive.metastore.connect.retries 5 Number of retries while opening a connection to metastore Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 1138719) +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy) @@ -31,6 +31,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Timer; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.common.metrics.Metrics; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; import org.apache.hadoop.hive.metastore.api.Constants; @@ -76,6 +78,7 @@ import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.EventCleanerTask; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook; import org.apache.hadoop.hive.metastore.model.MDBPrivilege; @@ -255,6 +258,12 @@ } } listeners = MetaStoreUtils.getMetaStoreListener(hiveConf); + long cleanFreq = hiveConf.getLongVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ) * 1000L; + if(cleanFreq > 0){ + // In default config, there is no timer. + Timer cleaner = new Timer("Metastore Events Cleaner Thread", true); + cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq); + } return true; } Index: metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision 1138719) +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (working copy) @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; @@ -3586,4 +3587,27 @@ } return join(storedVals,','); } + + @Override + public long cleanupEvents() { + boolean commited = false; + long delCnt; + LOG.debug("Begin executing cleanupEvents"); + Long expiryTime = HiveConf.getLongVar(getConf(), ConfVars.METASTORE_EVENT_EXPIRY_DURATION) * 1000L; + Long curTime = System.currentTimeMillis(); + try { + openTransaction(); + Query query = pm.newQuery(MPartitionEvent.class,"curTime - eventTime > expiryTime"); + query.declareParameters("java.lang.Long curTime, java.lang.Long expiryTime"); + delCnt = query.deletePersistentAll(curTime, expiryTime); + commited = commitTransaction(); + } + finally { + if (!commited) { + rollbackTransaction(); + } + LOG.debug("Done executing cleanupEvents"); + } + return delCnt; + } } Index: metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (revision 1138719) +++ metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (working copy) @@ -286,4 +286,6 @@ public abstract List listPartitionsPsWithAuth(String db_name, String tbl_name, List part_vals, short max_parts, String userName, List groupNames) throws MetaException, InvalidObjectException; -} \ No newline at end of file + + public abstract long cleanupEvents(); +} Index: metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java (revision 0) +++ metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java (revision 0) @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import java.util.TimerTask; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command; +import org.apache.hadoop.hive.metastore.RawStore; + +public class EventCleanerTask extends TimerTask{ + + public static final Log LOG = LogFactory.getLog(EventCleanerTask.class); + private final HMSHandler handler; + + public EventCleanerTask(HMSHandler handler) { + super(); + this.handler = handler; + } + + @Override + public void run() { + + try { + long deleteCnt = handler.executeWithRetry(new Command(){ + @Override + public Long run(RawStore ms) throws Exception { + return ms.cleanupEvents(); + } + }); + if (deleteCnt > 0L){ + LOG.info("Number of events deleted from event Table: "+deleteCnt); + } + } catch (Exception e) { + LOG.error(e); + } + } +} Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java =================================================================== --- metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java (revision 1138719) +++ metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java (working copy) @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -46,6 +47,8 @@ protected void setUp() throws Exception { super.setUp(); + System.setProperty(ConfVars.METASTORE_EVENT_CLEAN_FREQ.varname, "2"); + System.setProperty(ConfVars.METASTORE_EVENT_EXPIRY_DURATION.varname, "5"); hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); @@ -56,7 +59,7 @@ public void testMarkingPartitionSet() throws CommandNeedRetryException, MetaException, TException, NoSuchObjectException, UnknownDBException, UnknownTableException, - InvalidPartitionException, UnknownPartitionException { + InvalidPartitionException, UnknownPartitionException, InterruptedException { HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf, null); driver = new Driver(hiveConf); driver.run("drop database if exists tmpdb cascade"); @@ -69,6 +72,8 @@ kvs.put("b", "'2011'"); msc.markPartitionForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE); assert msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE); + Thread.sleep(10000); + assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE); kvs.put("b", "'2012'"); assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);