diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index f7e3e3a..ea6cb79 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RawStoreProxy; @@ -86,23 +87,17 @@ // HiveConf rather than a Configuration. private HiveConf hiveConf; private MessageFactory msgFactory; - private RawStore rs; - - private synchronized void init(HiveConf conf) { - try { - rs = RawStoreProxy.getProxy(conf, conf, - conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999); - } catch (MetaException e) { - LOG.error("Unable to connect to raw store, notifications will not be tracked", e); - rs = null; - } - if (cleaner == null && rs != null) { - cleaner = new CleanerThread(conf, rs); + + private synchronized void init(HiveConf conf) throws MetaException { + if (cleaner == null) { + cleaner = + new CleanerThread(conf, RawStoreProxy.getProxy(conf, conf, + conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999)); cleaner.start(); } } - public DbNotificationListener(Configuration config) { + public DbNotificationListener(Configuration config) throws MetaException { super(config); // The code in MetastoreUtils.getMetaStoreListeners() that calls this looks for a constructor // with a Configuration parameter, so we have to declare config as Configuration. But it @@ -473,16 +468,12 @@ private int now() { } // Process this notification by adding it to metastore DB - private void process(NotificationEvent event) { + private void process(NotificationEvent event) throws MetaException { event.setMessageFormat(msgFactory.getMessageFormat()); - if (rs != null) { - synchronized (NOTIFICATION_TBL_LOCK) { - LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), - event.getMessage()); - rs.addNotificationEvent(event); - } - } else { - LOG.warn("Dropping event " + event + " since notification is not running."); + synchronized (NOTIFICATION_TBL_LOCK) { + LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), + event.getMessage()); + HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 07eca38..7d4c35f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -227,7 +227,7 @@ public TTransport getTransport(TTransport trans) { public static class HMSHandler extends FacebookBase implements IHMSHandler, ThreadLocalRawStore { public static final Logger LOG = HiveMetaStore.LOG; - private String rawStoreClassName; + private static String rawStoreClassName; private final HiveConf hiveConf; // stores datastore (jpox) properties, // right now they come from jpox.properties @@ -367,6 +367,7 @@ public HMSHandler(String name, HiveConf conf) throws MetaException { public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException { super(name); hiveConf = conf; + rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); isInTest = HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST); synchronized (HMSHandler.class) { if (threadPool == null) { @@ -409,7 +410,6 @@ public HiveConf getHiveConf() { @Override public void init() throws MetaException { - rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); initListeners = MetaStoreUtils.getMetaStoreListeners( MetaStoreInitListener.class, hiveConf, hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); @@ -507,7 +507,7 @@ public Object getValue() { fileMetadataManager = new FileMetadataManager((ThreadLocalRawStore)this, hiveConf); } - private String addPrefix(String s) { + private static String addPrefix(String s) { return threadLocalId.get() + ": " + s; } @@ -592,6 +592,17 @@ public RawStore getMS() throws MetaException { return ms; } + public static RawStore getMSForConf(HiveConf hiveConf) throws MetaException { + RawStore ms = threadLocalMS.get(); + if (ms == null) { + ms = newRawStoreForConf(hiveConf); + ms.verifySchema(); + threadLocalMS.set(ms); + ms = threadLocalMS.get(); + } + return ms; + } + private TxnStore getTxnHandler() { TxnStore txn = threadLocalTxn.get(); if (txn == null) { @@ -602,24 +613,26 @@ private TxnStore getTxnHandler() { } private RawStore newRawStore() throws MetaException { - LOG.info(addPrefix("Opening raw store with implementation class:" - + rawStoreClassName)); - Configuration conf = getConf(); + HiveConf hiveConf = (HiveConf) getConf(); + return newRawStoreForConf(hiveConf); + } + private static RawStore newRawStoreForConf(HiveConf hiveConf) throws MetaException { + LOG.info(addPrefix("Opening raw store with implementation class:" + rawStoreClassName)); if (hiveConf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { LOG.info("Fastpath, skipping raw store proxy"); try { - RawStore rs = ((Class) MetaStoreUtils.getClass( - rawStoreClassName)).newInstance(); - rs.setConf(conf); + RawStore rs = + ((Class) MetaStoreUtils.getClass(rawStoreClassName)) + .newInstance(); + rs.setConf(hiveConf); return rs; } catch (Exception e) { LOG.error("Unable to instantiate raw store directly in fastpath mode", e); throw new RuntimeException(e); } } - - return RawStoreProxy.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get()); + return RawStoreProxy.getProxy(hiveConf, hiveConf, rawStoreClassName, threadLocalId.get()); } private void createDefaultDB_core(RawStore ms) throws MetaException, InvalidObjectException {