diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index f7e3e3a..ea6cb79 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RawStoreProxy; @@ -86,23 +87,17 @@ // HiveConf rather than a Configuration. private HiveConf hiveConf; private MessageFactory msgFactory; - private RawStore rs; - - private synchronized void init(HiveConf conf) { - try { - rs = RawStoreProxy.getProxy(conf, conf, - conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999); - } catch (MetaException e) { - LOG.error("Unable to connect to raw store, notifications will not be tracked", e); - rs = null; - } - if (cleaner == null && rs != null) { - cleaner = new CleanerThread(conf, rs); + + private synchronized void init(HiveConf conf) throws MetaException { + if (cleaner == null) { + cleaner = + new CleanerThread(conf, RawStoreProxy.getProxy(conf, conf, + conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999)); cleaner.start(); } } - public DbNotificationListener(Configuration config) { + public DbNotificationListener(Configuration config) throws MetaException { super(config); // The code in MetastoreUtils.getMetaStoreListeners() that calls this looks for a constructor // with a Configuration parameter, so we have to declare config as Configuration. But it @@ -473,16 +468,12 @@ private int now() { } // Process this notification by adding it to metastore DB - private void process(NotificationEvent event) { + private void process(NotificationEvent event) throws MetaException { event.setMessageFormat(msgFactory.getMessageFormat()); - if (rs != null) { - synchronized (NOTIFICATION_TBL_LOCK) { - LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), - event.getMessage()); - rs.addNotificationEvent(event); - } - } else { - LOG.warn("Dropping event " + event + " since notification is not running."); + synchronized (NOTIFICATION_TBL_LOCK) { + LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), + event.getMessage()); + HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 07eca38..80b1e98 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -227,7 +227,6 @@ public TTransport getTransport(TTransport trans) { public static class HMSHandler extends FacebookBase implements IHMSHandler, ThreadLocalRawStore { public static final Logger LOG = HiveMetaStore.LOG; - private String rawStoreClassName; private final HiveConf hiveConf; // stores datastore (jpox) properties, // right now they come from jpox.properties @@ -409,7 +408,6 @@ public HiveConf getHiveConf() { @Override public void init() throws MetaException { - rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); initListeners = MetaStoreUtils.getMetaStoreListeners( MetaStoreInitListener.class, hiveConf, hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); @@ -507,7 +505,7 @@ public Object getValue() { fileMetadataManager = new FileMetadataManager((ThreadLocalRawStore)this, hiveConf); } - private String addPrefix(String s) { + private static String addPrefix(String s) { return threadLocalId.get() + ": " + s; } @@ -582,9 +580,14 @@ public String getMetaConf(String key) throws MetaException { @InterfaceStability.Evolving @Override public RawStore getMS() throws MetaException { + Configuration conf = getConf(); + return getMSForConf(conf); + } + + public static RawStore getMSForConf(Configuration conf) throws MetaException { RawStore ms = threadLocalMS.get(); if (ms == null) { - ms = newRawStore(); + ms = newRawStoreForConf(conf); ms.verifySchema(); threadLocalMS.set(ms); ms = threadLocalMS.get(); @@ -601,24 +604,23 @@ private TxnStore getTxnHandler() { return txn; } - private RawStore newRawStore() throws MetaException { - LOG.info(addPrefix("Opening raw store with implementation class:" - + rawStoreClassName)); - Configuration conf = getConf(); - + private static RawStore newRawStoreForConf(Configuration conf) throws MetaException { + HiveConf hiveConf = new HiveConf(conf, HiveConf.class); + String rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); + LOG.info(addPrefix("Opening raw store with implementation class:" + rawStoreClassName)); if (hiveConf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { LOG.info("Fastpath, skipping raw store proxy"); try { - RawStore rs = ((Class) MetaStoreUtils.getClass( - rawStoreClassName)).newInstance(); - rs.setConf(conf); + RawStore rs = + ((Class) MetaStoreUtils.getClass(rawStoreClassName)) + .newInstance(); + rs.setConf(hiveConf); return rs; } catch (Exception e) { LOG.error("Unable to instantiate raw store directly in fastpath mode", e); throw new RuntimeException(e); } } - return RawStoreProxy.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get()); }