Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1138719)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -218,7 +218,8 @@
METASTORE_EVENT_LISTENERS("hive.metastore.event.listeners", ""),
// should we do checks against the storage (usually hdfs) for operations like drop_partition
METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks", false),
-
+ METASTORE_EVENT_CLEAN_FREQ("hive.metastore.event.clean.freq",0L),
+ METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration",0L),
// Default parameters for creating tables
NEWTABLEDEFAULTPARA("hive.table.parameters.default",""),
Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1138719)
+++ conf/hive-default.xml (working copy)
@@ -233,6 +233,18 @@
+ hive.metastore.event.expiry.duration
+ 0L
+ Duration after which events expire from events table (in seconds)
+
+
+
+ hive.metastore.event.clean.freq
+ 0L
+ Frequency at which timer task runs to purge expired events in metastore(in seconds).
+
+
+
hive.metastore.connect.retries
5
Number of retries while opening a connection to metastore
Index: metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (revision 1138719)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (working copy)
@@ -31,6 +31,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -42,6 +43,7 @@
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.common.metrics.Metrics;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
import org.apache.hadoop.hive.metastore.api.Constants;
@@ -76,6 +78,7 @@
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.EventCleanerTask;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook;
import org.apache.hadoop.hive.metastore.model.MDBPrivilege;
@@ -255,6 +258,12 @@
}
}
listeners = MetaStoreUtils.getMetaStoreListener(hiveConf);
+ long cleanFreq = hiveConf.getLongVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ) * 1000L;
+ if(cleanFreq > 0){
+ // In default config, there is no timer.
+ Timer cleaner = new Timer("Metastore Events Cleaner Thread", true);
+ cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq);
+ }
return true;
}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision 1138719)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (working copy)
@@ -53,6 +53,7 @@
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
@@ -3586,4 +3587,27 @@
}
return join(storedVals,',');
}
+
+ @Override
+ public long cleanupEvents() {
+ boolean commited = false;
+ long delCnt;
+ LOG.debug("Begin executing cleanupEvents");
+ Long expiryTime = HiveConf.getLongVar(getConf(), ConfVars.METASTORE_EVENT_EXPIRY_DURATION) * 1000L;
+ Long curTime = System.currentTimeMillis();
+ try {
+ openTransaction();
+ Query query = pm.newQuery(MPartitionEvent.class,"curTime - eventTime > expiryTime");
+ query.declareParameters("java.lang.Long curTime, java.lang.Long expiryTime");
+ delCnt = query.deletePersistentAll(curTime, expiryTime);
+ commited = commitTransaction();
+ }
+ finally {
+ if (!commited) {
+ rollbackTransaction();
+ }
+ LOG.debug("Done executing cleanupEvents");
+ }
+ return delCnt;
+ }
}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (revision 1138719)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (working copy)
@@ -286,4 +286,6 @@
public abstract List listPartitionsPsWithAuth(String db_name, String tbl_name,
List part_vals, short max_parts, String userName, List groupNames)
throws MetaException, InvalidObjectException;
-}
\ No newline at end of file
+
+ public abstract long cleanupEvents();
+}
Index: metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java (revision 0)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/events/EventCleanerTask.java (revision 0)
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command;
+import org.apache.hadoop.hive.metastore.RawStore;
+
+public class EventCleanerTask extends TimerTask{
+
+ public static final Log LOG = LogFactory.getLog(EventCleanerTask.class);
+ private final HMSHandler handler;
+
+ public EventCleanerTask(HMSHandler handler) {
+ super();
+ this.handler = handler;
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ long deleteCnt = handler.executeWithRetry(new Command(){
+ @Override
+ public Long run(RawStore ms) throws Exception {
+ return ms.cleanupEvents();
+ }
+ });
+ if (deleteCnt > 0L){
+ LOG.info("Number of events deleted from event Table: "+deleteCnt);
+ }
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+}
Index: metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java
===================================================================
--- metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java (revision 1138719)
+++ metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java (working copy)
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -46,6 +47,8 @@
protected void setUp() throws Exception {
super.setUp();
+ System.setProperty(ConfVars.METASTORE_EVENT_CLEAN_FREQ.varname, "2");
+ System.setProperty(ConfVars.METASTORE_EVENT_EXPIRY_DURATION.varname, "5");
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
@@ -56,7 +59,7 @@
public void testMarkingPartitionSet() throws CommandNeedRetryException, MetaException,
TException, NoSuchObjectException, UnknownDBException, UnknownTableException,
- InvalidPartitionException, UnknownPartitionException {
+ InvalidPartitionException, UnknownPartitionException, InterruptedException {
HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf, null);
driver = new Driver(hiveConf);
driver.run("drop database if exists tmpdb cascade");
@@ -69,6 +72,8 @@
kvs.put("b", "'2011'");
msc.markPartitionForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
assert msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
+ Thread.sleep(10000);
+ assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
kvs.put("b", "'2012'");
assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);