diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index f7e3e3a..bd9c241 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RawStoreProxy; @@ -86,23 +87,17 @@ // HiveConf rather than a Configuration. private HiveConf hiveConf; private MessageFactory msgFactory; - private RawStore rs; - - private synchronized void init(HiveConf conf) { - try { - rs = RawStoreProxy.getProxy(conf, conf, - conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999); - } catch (MetaException e) { - LOG.error("Unable to connect to raw store, notifications will not be tracked", e); - rs = null; - } - if (cleaner == null && rs != null) { - cleaner = new CleanerThread(conf, rs); + + private synchronized void init(HiveConf conf) throws MetaException { + if (cleaner == null) { + cleaner = + new CleanerThread(conf, RawStoreProxy.getProxy(conf, conf, + conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999)); cleaner.start(); } } - public DbNotificationListener(Configuration config) { + public DbNotificationListener(Configuration config) throws MetaException { super(config); // The code in MetastoreUtils.getMetaStoreListeners() that calls this looks for a constructor // with a Configuration parameter, so we have to declare config as Configuration. But it @@ -473,16 +468,19 @@ private int now() { } // Process this notification by adding it to metastore DB - private void process(NotificationEvent event) { + private void process(NotificationEvent event) throws MetaException { event.setMessageFormat(msgFactory.getMessageFormat()); - if (rs != null) { - synchronized (NOTIFICATION_TBL_LOCK) { - LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), - event.getMessage()); - rs.addNotificationEvent(event); + synchronized (NOTIFICATION_TBL_LOCK) { + LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), + event.getMessage()); + RawStore threadLocalRS = HMSHandler.getRawStore(); + if (threadLocalRS == null) { + threadLocalRS = + RawStoreProxy.getProxy(hiveConf, hiveConf, + hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999); + HMSHandler.setRawStore(threadLocalRS); } - } else { - LOG.warn("Dropping event " + event + " since notification is not running."); + threadLocalRS.addNotificationEvent(event); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 07eca38..99a6835 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -262,6 +262,10 @@ public static void removeRawStore() { threadLocalMS.remove(); } + public static void setRawStore(RawStore rs) { + threadLocalMS.set(rs); + } + // Thread local configuration is needed as many threads could make changes // to the conf using the connection hook private static final ThreadLocal threadLocalConf =