diff --git hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index f7e3e3a0a71094992fdf4bd3ceea2da0bf7d1ff0..4b434e8b5edfac88fe5ac92c3684c68f0ce5b30c 100644 --- hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RawStoreProxy; @@ -475,11 +476,14 @@ private int now() { // Process this notification by adding it to metastore DB private void process(NotificationEvent event) { event.setMessageFormat(msgFactory.getMessageFormat()); - if (rs != null) { + // To process the notification in the same JDO transaction as + // as the metadata event, we need to use the same persistancemanager. + RawStore threadLocalRs = HiveMetaStore.HMSHandler.getRawStore(); + if (threadLocalRs != null) { synchronized (NOTIFICATION_TBL_LOCK) { LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), event.getMessage()); - rs.addNotificationEvent(event); + threadLocalRs.addNotificationEvent(event); } } else { LOG.warn("Dropping event " + event + " since notification is not running."); diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 1cf47c36cb490ce0b17ffe312cd2e9fc4bb7cd9a..1773d5799b567b9d1fb6caf642a5748ba43ca7b0 100644 --- itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -76,7 +76,6 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +84,7 @@ * Tests DbNotificationListener when used as a transactional event listener * (hive.metastore.transactional.event.listeners) */ -public class TestDbNotificationListener { +public abstract class TestDbNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class .getName()); private static final int EVENTS_TTL = 30; @@ -97,12 +96,9 @@ private int startTime; private long firstEventId; - @SuppressWarnings("rawtypes") - @BeforeClass - public static void connectToMetastore() throws Exception { + public static void connectToMetastore(HiveConf.ConfVars listenerConfigName) throws Exception { HiveConf conf = new HiveConf(); - conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, - DbNotificationListener.class.getName()); + conf.setVar(listenerConfigName, DbNotificationListener.class.getName()); conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL) + "s"); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestNonTransactionalDbNotificationListener.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestNonTransactionalDbNotificationListener.java new file mode 100644 index 0000000000000000000000000000000000000000..a4a0bcfb8f3378b29d1ec7229068fb98ddb6b677 --- /dev/null +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestNonTransactionalDbNotificationListener.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.listener; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.BeforeClass; + +public class TestNonTransactionalDbNotificationListener extends TestDbNotificationListener { + + @SuppressWarnings("rawtypes") + @BeforeClass + public static void setListenerAndConnectToMetastore() throws Exception { + connectToMetastore(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS); + } +} diff --git itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java new file mode 100644 index 0000000000000000000000000000000000000000..b661ce177fda15d9482eaff37f4539b5094b0c53 --- /dev/null +++ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.listener; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.BeforeClass; + +public class TestTransactionalDbNotificationListener extends TestDbNotificationListener { + + @SuppressWarnings("rawtypes") + @BeforeClass + public static void setListenerAndConnectToMetastore() throws Exception { + connectToMetastore(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS); + } +}