diff --git src/java/org/apache/hcatalog/listener/NotificationListener.java src/java/org/apache/hcatalog/listener/NotificationListener.java index 0660042..2cef73a 100644 --- src/java/org/apache/hcatalog/listener/NotificationListener.java +++ src/java/org/apache/hcatalog/listener/NotificationListener.java @@ -141,6 +141,7 @@ public class NotificationListener extends MetaStoreEventListener { sd.setSortCols(new ArrayList()); sd.setParameters(new HashMap()); sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSkewedInfo().setSkewedColNames(new ArrayList()); String topicName = getTopicName(partition, partitionEvent); if (topicName != null && !topicName.equals("")) { send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); @@ -231,6 +232,7 @@ public class NotificationListener extends MetaStoreEventListener { sd.setSortCols(new ArrayList()); sd.setParameters(new HashMap()); sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSkewedInfo().setSkewedColNames(new ArrayList()); send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase(), HCatConstants.HCAT_DROP_TABLE_EVENT); diff --git src/test/org/apache/hcatalog/listener/TestNotificationListener.java src/test/org/apache/hcatalog/listener/TestNotificationListener.java index 58ef72c..9a66427 100644 --- src/test/org/apache/hcatalog/listener/TestNotificationListener.java +++ src/test/org/apache/hcatalog/listener/TestNotificationListener.java @@ -19,10 +19,10 @@ package org.apache.hcatalog.listener; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -37,38 +37,28 @@ import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; -import org.apache.hadoop.hive.metastore.api.UnknownTableException; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.common.HCatConstants; -import org.apache.thrift.TException; +import org.apache.hcatalog.mapreduce.HCatBaseTest; -import junit.framework.TestCase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -public class TestNotificationListener extends TestCase implements - MessageListener { +public class TestNotificationListener extends HCatBaseTest implements MessageListener { - private HiveConf hiveConf; - private Driver driver; - private AtomicInteger cntInvocation = new AtomicInteger(0); + private List actualMessages = new ArrayList(); - @Override - protected void setUp() throws Exception { - - super.setUp(); + @Before + public void setUp() throws Exception { System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); System.setProperty("java.naming.provider.url", @@ -92,34 +82,37 @@ public class TestNotificationListener extends TestCase implements .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb"); MessageConsumer consumer3 = session.createConsumer(dbTopic); consumer3.setMessageListener(this); - hiveConf = new HiveConf(this.getClass()); + + setUpHiveConf(); hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, NotificationListener.class.getName()); - hiveConf.set("hive.metastore.local", "true"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); SessionState.start(new CliSessionState(hiveConf)); driver = new Driver(hiveConf); + client = new HiveMetaStoreClient(hiveConf); } - @Override - protected void tearDown() throws Exception { - assertEquals(7, cntInvocation.get()); - super.tearDown(); + @After + public void tearDown() throws Exception { + List expectedMessages = Arrays.asList( + HCatConstants.HCAT_ADD_DATABASE_EVENT, + HCatConstants.HCAT_ADD_TABLE_EVENT, + HCatConstants.HCAT_ADD_PARTITION_EVENT, + HCatConstants.HCAT_PARTITION_DONE_EVENT, + HCatConstants.HCAT_DROP_PARTITION_EVENT, + HCatConstants.HCAT_DROP_TABLE_EVENT, + HCatConstants.HCAT_DROP_DATABASE_EVENT); + Assert.assertEquals(expectedMessages, actualMessages); } - public void testAMQListener() throws MetaException, TException, - UnknownTableException, NoSuchObjectException, CommandNeedRetryException, - UnknownDBException, InvalidPartitionException, UnknownPartitionException { + @Test + public void testAMQListener() throws Exception { driver.run("create database mydb"); driver.run("use mydb"); driver.run("create table mytbl (a string) partitioned by (b string)"); driver.run("alter table mytbl add partition(b='2011')"); - HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); Map kvs = new HashMap(1); kvs.put("b", "2011"); - msc.markPartitionForEvent("mydb", "mytbl", kvs, + client.markPartitionForEvent("mydb", "mytbl", kvs, PartitionEventType.LOAD_DONE); driver.run("alter table mytbl drop partition(b='2011')"); driver.run("drop table mytbl"); @@ -128,59 +121,59 @@ public class TestNotificationListener extends TestCase implements @Override public void onMessage(Message msg) { - cntInvocation.incrementAndGet(); - String event; try { event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + actualMessages.add(event); + if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) { - assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg .getJMSDestination().toString()); - assertEquals("mydb", + Assert.assertEquals("mydb", ((Database) ((ObjectMessage) msg).getObject()).getName()); } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) { - assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); Table tbl = (Table) (((ObjectMessage) msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); + Assert.assertEquals("mytbl", tbl.getTableName()); + Assert.assertEquals("mydb", tbl.getDbName()); + Assert.assertEquals(1, tbl.getPartitionKeysSize()); } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() .toString()); Partition part = (Partition) (((ObjectMessage) msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); + Assert.assertEquals("mytbl", part.getTableName()); + Assert.assertEquals("mydb", part.getDbName()); List vals = new ArrayList(1); vals.add("2011"); - assertEquals(vals, part.getValues()); + Assert.assertEquals(vals, part.getValues()); } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() .toString()); Partition part = (Partition) (((ObjectMessage) msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); + Assert.assertEquals("mytbl", part.getTableName()); + Assert.assertEquals("mydb", part.getDbName()); List vals = new ArrayList(1); vals.add("2011"); - assertEquals(vals, part.getValues()); + Assert.assertEquals(vals, part.getValues()); } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { - assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Assert.assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); Table tbl = (Table) (((ObjectMessage) msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); + Assert.assertEquals("mytbl", tbl.getTableName()); + Assert.assertEquals("mydb", tbl.getDbName()); + Assert.assertEquals(1, tbl.getPartitionKeysSize()); } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { - assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + Assert.assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg .getJMSDestination().toString()); - assertEquals("mydb", + Assert.assertEquals("mydb", ((Database) ((ObjectMessage) msg).getObject()).getName()); } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + Assert.assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() .toString()); MapMessage mapMsg = (MapMessage) msg; assert mapMsg.getString("b").equals("2011"); diff --git src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java index 4302e67..a5fcd7b 100644 --- src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java +++ src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java @@ -60,17 +60,24 @@ public class HCatBaseTest { @Before public void setUp() throws Exception { if (driver == null) { - hiveConf = new HiveConf(this.getClass()); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + setUpHiveConf(); driver = new Driver(hiveConf); client = new HiveMetaStoreClient(hiveConf); SessionState.start(new CliSessionState(hiveConf)); } } + /** + * Create a new HiveConf and set properties necessary for unit tests. + */ + protected void setUpHiveConf() { + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + } + protected void logAndRegister(PigServer server, String query) throws IOException { LOG.info("Registering pig query: " + query); server.registerQuery(query);