diff --git hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java index 9e03da4..ef7b575 100644 --- hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java +++ hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestNotificationListener.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -65,6 +67,18 @@ public class TestNotificationListener extends HCatBaseTest implements MessageListener { private List actualMessages = new Vector(); + private static final int MSG_RECEIVED_TIMEOUT = 30; + private static final List expectedMessages = Arrays.asList( + HCatConstants.HCAT_CREATE_DATABASE_EVENT, + HCatConstants.HCAT_CREATE_TABLE_EVENT, + HCatConstants.HCAT_ADD_PARTITION_EVENT, + HCatConstants.HCAT_ALTER_PARTITION_EVENT, + HCatConstants.HCAT_DROP_PARTITION_EVENT, + HCatConstants.HCAT_ALTER_TABLE_EVENT, + HCatConstants.HCAT_DROP_TABLE_EVENT, + HCatConstants.HCAT_DROP_DATABASE_EVENT); + private static final CountDownLatch messageReceivedSignal = + new CountDownLatch(expectedMessages.size()); @Before public void setUp() throws Exception { @@ -105,15 +119,6 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - List expectedMessages = Arrays.asList( - HCatConstants.HCAT_CREATE_DATABASE_EVENT, - HCatConstants.HCAT_CREATE_TABLE_EVENT, - HCatConstants.HCAT_ADD_PARTITION_EVENT, - HCatConstants.HCAT_ALTER_PARTITION_EVENT, - HCatConstants.HCAT_DROP_PARTITION_EVENT, - HCatConstants.HCAT_ALTER_TABLE_EVENT, - HCatConstants.HCAT_DROP_TABLE_EVENT, - HCatConstants.HCAT_DROP_DATABASE_EVENT); Assert.assertEquals(expectedMessages, actualMessages); } @@ -132,6 +137,9 @@ public void testAMQListener() throws Exception { driver.run("alter table mytbl add columns (c int comment 'this is an int', d decimal(3,2))"); driver.run("drop table mytbl"); driver.run("drop database mydb"); + + // Wait until either all messages are processed or a maximum time limit is reached. + messageReceivedSignal.await(MSG_RECEIVED_TIMEOUT, TimeUnit.SECONDS); } @Override @@ -248,5 +256,8 @@ public void onMessage(Message msg) { e.printStackTrace(System.err); assert false; } + finally { + messageReceivedSignal.countDown(); + } } }