Index: src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java =================================================================== --- src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (revision 1352734) +++ src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (working copy) @@ -110,18 +110,18 @@ String pigSchema = "(" + "a: " + - "(" + - "aa: chararray, " + - "ab: long, " + - "ac: map[], " + - "ad: { t: (ada: long) }, " + - "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," + - "af: (afa: chararray, afb: long) " + - ")," + - "b: chararray, " + - "c: long, " + - "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " + - ")"; + "(" + + "aa: chararray, " + + "ab: long, " + + "ac: map[], " + + "ad: { t: (ada: long) }, " + + "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," + + "af: (afa: chararray, afb: long) " + + ")," + + "b: chararray, " + + "c: long, " + + "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " + + ")"; // with extra structs String tableSchema = Index: src/test/org/apache/hcatalog/pig/TestHCatStorer.java =================================================================== --- src/test/org/apache/hcatalog/pig/TestHCatStorer.java (revision 1352734) +++ src/test/org/apache/hcatalog/pig/TestHCatStorer.java (working copy) @@ -132,7 +132,7 @@ driver.run("drop table employee"); String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " + - " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE"; + " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE"; int retCode = driver.run(createTable).getResponseCode(); if(retCode != 0) { @@ -148,7 +148,7 @@ PigServer pig = new PigServer(ExecType.LOCAL); pig.setBatchOn(); pig.registerQuery("A = LOAD '"+INPUT_FILE_NAME+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," + - "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); + "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); pig.registerQuery("TN = FILTER A BY emp_state == 'TN';"); pig.registerQuery("KA = FILTER A BY emp_state == 'KA';"); pig.registerQuery("KL = FILTER A BY emp_state == 'KL';"); @@ -415,7 +415,7 @@ public void testBagNStruct() throws IOException, CommandNeedRetryException{ driver.run("drop table junit_unparted"); String createTable = "create table junit_unparted(b string,a struct, arr_of_struct array, " + - "arr_of_struct2 array>, arr_of_struct3 array>) stored as RCFILE"; + "arr_of_struct2 array>, arr_of_struct3 array>) stored as RCFILE"; int retCode = driver.run(createTable).getResponseCode(); if(retCode != 0) { throw new IOException("Failed to create table."); @@ -430,7 +430,7 @@ server.setBatchOn(); server.registerQuery("A = load '"+INPUT_FILE_NAME+"' as (b:chararray, a:tuple(a1:int), arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)});"); server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','b:chararray, a:tuple(a1:int)," + - " arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)}');"); + " arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)}');"); server.executeBatch(); driver.run("select * from junit_unparted"); Index: src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (revision 1352734) +++ src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (working copy) @@ -41,73 +41,82 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hcatalog.common.HCatConstants; -public class TestMsgBusConnection extends TestCase{ +public class TestMsgBusConnection extends TestCase { - private Driver driver; - private BrokerService broker; - private MessageConsumer consumer; + private Driver driver; + private BrokerService broker; + private MessageConsumer consumer; - @Override - protected void setUp() throws Exception { + @Override + protected void setUp() throws Exception { - super.setUp(); - broker = new BrokerService(); - // configure the broker - broker.addConnector("tcp://localhost:61616?broker.persistent=false"); + super.setUp(); + broker = new BrokerService(); + // configure the broker + broker.addConnector("tcp://localhost:61616?broker.persistent=false"); - broker.start(); + broker.start(); - System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); - connectClient(); - HiveConf hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName()); - hiveConf.set("hive.metastore.local", "true"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); - SessionState.start(new CliSessionState(hiveConf)); - driver = new Driver(hiveConf); - } + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); + connectClient(); + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + NotificationListener.class.getName()); + hiveConf.set("hive.metastore.local", "true"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } - private void connectClient() throws JMSException{ - ConnectionFactory connFac = new ActiveMQConnectionFactory("tcp://localhost:61616"); - Connection conn = connFac.createConnection(); - conn.start(); - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session.createTopic("planetlab.hcat"); - consumer = session.createConsumer(hcatTopic); - } + private void connectClient() throws JMSException { + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "tcp://localhost:61616"); + Connection conn = connFac.createConnection(); + conn.start(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session.createTopic("planetlab.hcat"); + consumer = session.createConsumer(hcatTopic); + } - public void testConnection() throws Exception{ + public void testConnection() throws Exception { - try{ - driver.run("create database testconndb"); - Message msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); - assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - broker.stop(); - driver.run("drop database testconndb cascade"); - broker.start(true); - connectClient(); - driver.run("create database testconndb"); - msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); - assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - driver.run("drop database testconndb cascade"); - msg = consumer.receive(); - assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString()); - assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - } catch (NoSuchObjectException nsoe){ - nsoe.printStackTrace(System.err); - assert false; - } catch (AlreadyExistsException aee){ - aee.printStackTrace(System.err); - assert false; - } - } + try { + driver.run("create database testconndb"); + Message msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + broker.stop(); + driver.run("drop database testconndb cascade"); + broker.start(true); + connectClient(); + driver.run("create database testconndb"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + driver.run("drop database testconndb cascade"); + msg = consumer.receive(); + assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, + msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } catch (NoSuchObjectException nsoe) { + nsoe.printStackTrace(System.err); + assert false; + } catch (AlreadyExistsException aee) { + aee.printStackTrace(System.err); + assert false; + } + } } Index: src/test/org/apache/hcatalog/listener/TestNotificationListener.java =================================================================== --- src/test/org/apache/hcatalog/listener/TestNotificationListener.java (revision 1352734) +++ src/test/org/apache/hcatalog/listener/TestNotificationListener.java (working copy) @@ -58,126 +58,137 @@ import junit.framework.TestCase; -public class TestNotificationListener extends TestCase implements MessageListener{ +public class TestNotificationListener extends TestCase implements + MessageListener { - private HiveConf hiveConf; - private Driver driver; - private AtomicInteger cntInvocation = new AtomicInteger(0); + private HiveConf hiveConf; + private Driver driver; + private AtomicInteger cntInvocation = new AtomicInteger(0); - @Override - protected void setUp() throws Exception { + @Override + protected void setUp() throws Exception { - super.setUp(); - System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); - System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false"); - ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - Connection conn = connFac.createConnection(); - conn.start(); - // We want message to be sent when session commits, thus we run in - // transacted mode. - Session session = conn.createSession(true, Session.SESSION_TRANSACTED); - Destination hcatTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); - MessageConsumer consumer1 = session.createConsumer(hcatTopic); - consumer1.setMessageListener(this); - Destination tblTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb.mytbl"); - MessageConsumer consumer2 = session.createConsumer(tblTopic); - consumer2.setMessageListener(this); - Destination dbTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb"); - MessageConsumer consumer3 = session.createConsumer(dbTopic); - consumer3.setMessageListener(this); - hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName()); - hiveConf.set("hive.metastore.local", "true"); - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - SessionState.start(new CliSessionState(hiveConf)); - driver = new Driver(hiveConf); - } + super.setUp(); + System.setProperty("java.naming.factory.initial", + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.provider.url", + "vm://localhost?broker.persistent=false"); + ConnectionFactory connFac = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false"); + Connection conn = connFac.createConnection(); + conn.start(); + // We want message to be sent when session commits, thus we run in + // transacted mode. + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + Destination hcatTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); + MessageConsumer consumer1 = session.createConsumer(hcatTopic); + consumer1.setMessageListener(this); + Destination tblTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl"); + MessageConsumer consumer2 = session.createConsumer(tblTopic); + consumer2.setMessageListener(this); + Destination dbTopic = session + .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb"); + MessageConsumer consumer3 = session.createConsumer(dbTopic); + consumer3.setMessageListener(this); + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, + NotificationListener.class.getName()); + hiveConf.set("hive.metastore.local", "true"); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + SessionState.start(new CliSessionState(hiveConf)); + driver = new Driver(hiveConf); + } - @Override - protected void tearDown() throws Exception { - assertEquals(7, cntInvocation.get()); - super.tearDown(); - } + @Override + protected void tearDown() throws Exception { + assertEquals(7, cntInvocation.get()); + super.tearDown(); + } - public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, - CommandNeedRetryException, UnknownDBException, InvalidPartitionException, UnknownPartitionException{ - driver.run("create database mydb"); - driver.run("use mydb"); - driver.run("create table mytbl (a string) partitioned by (b string)"); - driver.run("alter table mytbl add partition(b='2011')"); - HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); - Map kvs = new HashMap(1); - kvs.put("b", "2011"); - msc.markPartitionForEvent("mydb", "mytbl", kvs, PartitionEventType.LOAD_DONE); - driver.run("alter table mytbl drop partition(b='2011')"); - driver.run("drop table mytbl"); - driver.run("drop database mydb"); - } + public void testAMQListener() throws MetaException, TException, + UnknownTableException, NoSuchObjectException, CommandNeedRetryException, + UnknownDBException, InvalidPartitionException, UnknownPartitionException { + driver.run("create database mydb"); + driver.run("use mydb"); + driver.run("create table mytbl (a string) partitioned by (b string)"); + driver.run("alter table mytbl add partition(b='2011')"); + HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf); + Map kvs = new HashMap(1); + kvs.put("b", "2011"); + msc.markPartitionForEvent("mydb", "mytbl", kvs, + PartitionEventType.LOAD_DONE); + driver.run("alter table mytbl drop partition(b='2011')"); + driver.run("drop table mytbl"); + driver.run("drop database mydb"); + } - @Override - public void onMessage(Message msg) { - cntInvocation.incrementAndGet(); + @Override + public void onMessage(Message msg) { + cntInvocation.incrementAndGet(); - String event; - try { - event = msg.getStringProperty(HCatConstants.HCAT_EVENT); - if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){ + String event; + try { + event = msg.getStringProperty(HCatConstants.HCAT_EVENT); + if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) { - assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString()); - assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - } - else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){ + assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + assertEquals("mydb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) { - assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString()); - Table tbl = (Table)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); - } - else if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){ + assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Table tbl = (Table) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString()); - Partition part = (Partition)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - assertEquals(vals,part.getValues()); - } - else if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){ + assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + Partition part = (Partition) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + assertEquals(vals, part.getValues()); + } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString()); - Partition part = (Partition)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", part.getTableName()); - assertEquals("mydb", part.getDbName()); - List vals = new ArrayList(1); - vals.add("2011"); - assertEquals(vals,part.getValues()); - } - else if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){ + assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + Partition part = (Partition) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", part.getTableName()); + assertEquals("mydb", part.getDbName()); + List vals = new ArrayList(1); + vals.add("2011"); + assertEquals(vals, part.getValues()); + } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { - assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString()); - Table tbl = (Table)(((ObjectMessage)msg).getObject()); - assertEquals("mytbl", tbl.getTableName()); - assertEquals("mydb", tbl.getDbName()); - assertEquals(1, tbl.getPartitionKeysSize()); - } - else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){ + assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString()); + Table tbl = (Table) (((ObjectMessage) msg).getObject()); + assertEquals("mytbl", tbl.getTableName()); + assertEquals("mydb", tbl.getDbName()); + assertEquals(1, tbl.getPartitionKeysSize()); + } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { - assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString()); - assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName()); - } - else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { - assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString()); - MapMessage mapMsg = (MapMessage)msg; - assert mapMsg.getString("b").equals("2011"); - } else - assert false; - } catch (JMSException e) { - e.printStackTrace(System.err); - assert false; - } - } + assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg + .getJMSDestination().toString()); + assertEquals("mydb", + ((Database) ((ObjectMessage) msg).getObject()).getName()); + } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) { + assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination() + .toString()); + MapMessage mapMsg = (MapMessage) msg; + assert mapMsg.getString("b").equals("2011"); + } else + assert false; + } catch (JMSException e) { + e.printStackTrace(System.err); + assert false; + } + } } Index: src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java =================================================================== --- src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java (revision 1352734) +++ src/test/org/apache/hcatalog/cli/TestSemanticAnalysis.java (working copy) @@ -57,7 +57,7 @@ @Override protected void setUp() throws Exception { - System.setProperty(ConfVars.METASTORE_EVENT_LISTENERS.varname, NotificationListener.class.getName()); + System.setProperty(ConfVars.METASTORE_EVENT_LISTENERS.varname, NotificationListener.class.getName()); HiveConf hcatConf = new HiveConf(this.getClass()); hcatConf.set(ConfVars.PREEXECHOOKS.varname, ""); hcatConf.set(ConfVars.POSTEXECHOOKS.varname, ""); @@ -77,14 +77,14 @@ private final String tblName = "junit_sem_analysis"; public void testDescDB() throws CommandNeedRetryException, IOException { - hcatDriver.run("drop database mydb cascade"); - assertEquals(0, hcatDriver.run("create database mydb").getResponseCode()); - CommandProcessorResponse resp = hcatDriver.run("describe database mydb"); - assertEquals(0, resp.getResponseCode()); - ArrayList result = new ArrayList(); - hcatDriver.getResults(result); - assertTrue(result.get(0).contains("mydb.db")); - hcatDriver.run("drop database mydb cascade"); + hcatDriver.run("drop database mydb cascade"); + assertEquals(0, hcatDriver.run("create database mydb").getResponseCode()); + CommandProcessorResponse resp = hcatDriver.run("describe database mydb"); + assertEquals(0, resp.getResponseCode()); + ArrayList result = new ArrayList(); + hcatDriver.getResults(result); + assertTrue(result.get(0).contains("mydb.db")); + hcatDriver.run("drop database mydb cascade"); } public void testCreateTblWithLowerCasePartNames() throws CommandNeedRetryException, MetaException, TException, NoSuchObjectException{ @@ -292,8 +292,8 @@ hcatDriver.run("drop table junit_sem_analysis"); query = "create table junit_sem_analysis (a int) partitioned by (b string) stored as " + - "INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT " + - "'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' inputdriver 'mydriver' outputdriver 'yourdriver' "; + "INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT " + + "'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' inputdriver 'mydriver' outputdriver 'yourdriver' "; assertEquals(0,hcatDriver.run(query).getResponseCode()); Table tbl = msc.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tblName); Index: src/test/org/apache/hcatalog/data/TestReaderWriter.java =================================================================== --- src/test/org/apache/hcatalog/data/TestReaderWriter.java (revision 1352734) +++ src/test/org/apache/hcatalog/data/TestReaderWriter.java (working copy) @@ -52,134 +52,143 @@ public class TestReaderWriter { - @Test - public void test() throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException { + @Test + public void test() throws MetaException, CommandNeedRetryException, + IOException, ClassNotFoundException { - HiveConf conf = new HiveConf(getClass()); - Driver driver = new Driver(conf); - SessionState.start(new CliSessionState(conf)); - driver.run("drop table mytbl"); - driver.run("create table mytbl (a string, b int)"); - Iterator> itr = conf.iterator(); - Map map = new HashMap(); - while(itr.hasNext()){ - Entry kv = itr.next(); - map.put(kv.getKey(), kv.getValue()); - } - - WriterContext cntxt = runsInMaster(map); - - File writeCntxtFile = File.createTempFile("hcat-write", "temp"); - writeCntxtFile.deleteOnExit(); - - // Serialize context. - ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile)); - oos.writeObject(cntxt); - oos.flush(); - oos.close(); - - // Now, deserialize it. - ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile)); - cntxt = (WriterContext) ois.readObject(); - ois.close(); - - runsInSlave(cntxt); - commit(map, true, cntxt); - - ReaderContext readCntxt = runsInMaster(map, false); - - File readCntxtFile = File.createTempFile("hcat-read", "temp"); - readCntxtFile.deleteOnExit(); - oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile)); - oos.writeObject(readCntxt); - oos.flush(); - oos.close(); - - ois = new ObjectInputStream(new FileInputStream(readCntxtFile)); - readCntxt = (ReaderContext) ois.readObject(); - ois.close(); - - - for(InputSplit split : readCntxt.getSplits()){ - runsInSlave(split, readCntxt.getConf()); - } - } + HiveConf conf = new HiveConf(getClass()); + Driver driver = new Driver(conf); + SessionState.start(new CliSessionState(conf)); + driver.run("drop table mytbl"); + driver.run("create table mytbl (a string, b int)"); + Iterator> itr = conf.iterator(); + Map map = new HashMap(); + while (itr.hasNext()) { + Entry kv = itr.next(); + map.put(kv.getKey(), kv.getValue()); + } - private WriterContext runsInMaster(Map config) throws HCatException { + WriterContext cntxt = runsInMaster(map); - WriteEntity.Builder builder = new WriteEntity.Builder(); - WriteEntity entity = builder.withTable("mytbl").build(); - HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); - WriterContext info = writer.prepareWrite(); - return info; - } - - private ReaderContext runsInMaster(Map config, boolean bogus) throws HCatException { + File writeCntxtFile = File.createTempFile("hcat-write", "temp"); + writeCntxtFile.deleteOnExit(); - ReadEntity.Builder builder = new ReadEntity.Builder(); - ReadEntity entity = builder.withTable("mytbl").build(); - HCatReader reader = DataTransferFactory.getHCatReader(entity, config); - ReaderContext cntxt = reader.prepareRead(); - return cntxt; - } + // Serialize context. + ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream( + writeCntxtFile)); + oos.writeObject(cntxt); + oos.flush(); + oos.close(); - private void runsInSlave(InputSplit split, Configuration config) throws HCatException { + // Now, deserialize it. + ObjectInputStream ois = new ObjectInputStream(new FileInputStream( + writeCntxtFile)); + cntxt = (WriterContext) ois.readObject(); + ois.close(); - HCatReader reader = DataTransferFactory.getHCatReader(split, config); - Iterator itr = reader.read(); - int i = 1; - while(itr.hasNext()){ - HCatRecord read = itr.next(); - HCatRecord written = getRecord(i++); - // Argh, HCatRecord doesnt implement equals() - Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0), written.get(0).equals(read.get(0))); - Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1), written.get(1).equals(read.get(1))); - Assert.assertEquals(2, read.size()); - } - Assert.assertFalse(itr.hasNext()); - } - - private void runsInSlave(WriterContext context) throws HCatException { + runsInSlave(cntxt); + commit(map, true, cntxt); - HCatWriter writer = DataTransferFactory.getHCatWriter(context); - writer.write(new HCatRecordItr()); - } + ReaderContext readCntxt = runsInMaster(map, false); - private void commit(Map config, boolean status, WriterContext context) throws IOException { + File readCntxtFile = File.createTempFile("hcat-read", "temp"); + readCntxtFile.deleteOnExit(); + oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile)); + oos.writeObject(readCntxt); + oos.flush(); + oos.close(); - WriteEntity.Builder builder = new WriteEntity.Builder(); - WriteEntity entity = builder.withTable("mytbl").build(); - HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); - if(status){ - writer.commit(context); - } else { - writer.abort(context); - } - } + ois = new ObjectInputStream(new FileInputStream(readCntxtFile)); + readCntxt = (ReaderContext) ois.readObject(); + ois.close(); - private static HCatRecord getRecord(int i) { - List list = new ArrayList(2); - list.add("Row #: " + i); - list.add(i); - return new DefaultHCatRecord(list); - } - - private static class HCatRecordItr implements Iterator { + for (InputSplit split : readCntxt.getSplits()) { + runsInSlave(split, readCntxt.getConf()); + } + } - int i = 0; - @Override - public boolean hasNext() { - return i++ < 100 ? true : false; - } + private WriterContext runsInMaster(Map config) + throws HCatException { - @Override - public HCatRecord next() { - return getRecord(i); - } + WriteEntity.Builder builder = new WriteEntity.Builder(); + WriteEntity entity = builder.withTable("mytbl").build(); + HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); + WriterContext info = writer.prepareWrite(); + return info; + } - @Override - public void remove() { - throw new RuntimeException(); - } - } + private ReaderContext runsInMaster(Map config, boolean bogus) + throws HCatException { + + ReadEntity.Builder builder = new ReadEntity.Builder(); + ReadEntity entity = builder.withTable("mytbl").build(); + HCatReader reader = DataTransferFactory.getHCatReader(entity, config); + ReaderContext cntxt = reader.prepareRead(); + return cntxt; + } + + private void runsInSlave(InputSplit split, Configuration config) + throws HCatException { + + HCatReader reader = DataTransferFactory.getHCatReader(split, config); + Iterator itr = reader.read(); + int i = 1; + while (itr.hasNext()) { + HCatRecord read = itr.next(); + HCatRecord written = getRecord(i++); + // Argh, HCatRecord doesnt implement equals() + Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0), + written.get(0).equals(read.get(0))); + Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1), + written.get(1).equals(read.get(1))); + Assert.assertEquals(2, read.size()); + } + Assert.assertFalse(itr.hasNext()); + } + + private void runsInSlave(WriterContext context) throws HCatException { + + HCatWriter writer = DataTransferFactory.getHCatWriter(context); + writer.write(new HCatRecordItr()); + } + + private void commit(Map config, boolean status, + WriterContext context) throws IOException { + + WriteEntity.Builder builder = new WriteEntity.Builder(); + WriteEntity entity = builder.withTable("mytbl").build(); + HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); + if (status) { + writer.commit(context); + } else { + writer.abort(context); + } + } + + private static HCatRecord getRecord(int i) { + List list = new ArrayList(2); + list.add("Row #: " + i); + list.add(i); + return new DefaultHCatRecord(list); + } + + private static class HCatRecordItr implements Iterator { + + int i = 0; + + @Override + public boolean hasNext() { + return i++ < 100 ? true : false; + } + + @Override + public HCatRecord next() { + return getRecord(i); + } + + @Override + public void remove() { + throw new RuntimeException(); + } + } } Index: src/java/org/apache/hcatalog/pig/HCatBaseStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (revision 1352734) +++ src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (working copy) @@ -166,7 +166,7 @@ return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null); case DataType.BYTEARRAY: - return new HCatFieldSchema(fSchema.alias, Type.BINARY, null); + return new HCatFieldSchema(fSchema.alias, Type.BINARY, null); case DataType.BAG: Schema bagSchema = fSchema.schema; Index: src/java/org/apache/hcatalog/pig/HCatStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatStorer.java (revision 1352734) +++ src/java/org/apache/hcatalog/pig/HCatStorer.java (working copy) @@ -156,7 +156,7 @@ //Calling it from here so that the partition publish happens. //This call needs to be removed after MAPREDUCE-1447 is fixed. getOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext( - job.getConfiguration(), new TaskAttemptID())).cleanupJob(job); + job.getConfiguration(), new TaskAttemptID())).cleanupJob(job); } catch (IOException e) { throw new IOException("Failed to cleanup job",e); } catch (InterruptedException e) { Index: src/java/org/apache/hcatalog/shims/HCatHadoopShims.java =================================================================== --- src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (revision 1352734) +++ src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (working copy) @@ -25,38 +25,39 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; /** - * Shim layer to abstract differences between Hadoop 0.20 and 0.23 (HCATALOG-179). - * This mirrors Hive shims, but is kept separate for HCatalog dependencies. + * Shim layer to abstract differences between Hadoop 0.20 and 0.23 + * (HCATALOG-179). This mirrors Hive shims, but is kept separate for HCatalog + * dependencies. **/ public interface HCatHadoopShims { - public static abstract class Instance { - static HCatHadoopShims instance = selectShim(); - public static HCatHadoopShims get() { - return instance; - } + public static abstract class Instance { + static HCatHadoopShims instance = selectShim(); - private static HCatHadoopShims selectShim() { - // piggyback on Hive's detection logic - String major = ShimLoader.getMajorVersion(); - String shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims20S"; - if (major.startsWith("0.23")) { - shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims23"; - } - try { - Class clasz = - Class.forName(shimFQN).asSubclass(HCatHadoopShims.class); - return clasz.newInstance(); - } catch (Exception e) { - throw new RuntimeException("Failed to instantiate: " + shimFQN, e); - } - } - } + public static HCatHadoopShims get() { + return instance; + } - public TaskAttemptContext createTaskAttemptContext(Configuration conf, - TaskAttemptID taskId); + private static HCatHadoopShims selectShim() { + // piggyback on Hive's detection logic + String major = ShimLoader.getMajorVersion(); + String shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims20S"; + if (major.startsWith("0.23")) { + shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims23"; + } + try { + Class clasz = Class.forName(shimFQN) + .asSubclass(HCatHadoopShims.class); + return clasz.newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate: " + shimFQN, e); + } + } + } - public JobContext createJobContext(Configuration conf, - JobID jobId); + public TaskAttemptContext createTaskAttemptContext(Configuration conf, + TaskAttemptID taskId); + public JobContext createJobContext(Configuration conf, JobID jobId); + } Index: src/java/org/apache/hcatalog/listener/NotificationListener.java =================================================================== --- src/java/org/apache/hcatalog/listener/NotificationListener.java (revision 1352734) +++ src/java/org/apache/hcatalog/listener/NotificationListener.java (working copy) @@ -64,288 +64,310 @@ import org.apache.hcatalog.common.HCatConstants; /** - * Implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} - * It sends message on two type of topics. One has name of form dbName.tblName - * On this topic, two kind of messages are sent: add/drop partition and - * finalize_partition message. - * Second topic has name "HCAT" and messages sent on it are: add/drop database - * and add/drop table. - * All messages also has a property named "HCAT_EVENT" set on them whose value - * can be used to configure message selector on subscriber side. + * Implementation of + * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends + * message on two type of topics. One has name of form dbName.tblName On this + * topic, two kind of messages are sent: add/drop partition and + * finalize_partition message. Second topic has name "HCAT" and messages sent on + * it are: add/drop database and add/drop table. All messages also has a + * property named "HCAT_EVENT" set on them whose value can be used to configure + * message selector on subscriber side. */ -public class NotificationListener extends MetaStoreEventListener{ +public class NotificationListener extends MetaStoreEventListener { - private static final Log LOG = LogFactory.getLog(NotificationListener.class); - protected Session session; - protected Connection conn; + private static final Log LOG = LogFactory.getLog(NotificationListener.class); + protected Session session; + protected Connection conn; - /** - * Create message bus connection and session in constructor. - */ - public NotificationListener(final Configuration conf) { + /** + * Create message bus connection and session in constructor. + */ + public NotificationListener(final Configuration conf) { - super(conf); - createConnection(); - } + super(conf); + createConnection(); + } - private static String getTopicName(Partition partition, - ListenerEvent partitionEvent) throws MetaException { - try { - return partitionEvent.getHandler() - .get_table(partition.getDbName(), partition.getTableName()) - .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); - } catch (NoSuchObjectException e) { - throw new MetaException(e.toString()); - } - } - - @Override - public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { - // Subscriber can get notification of newly add partition in a - // particular table by listening on a topic named "dbName.tableName" - // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" - if(partitionEvent.getStatus()){ + private static String getTopicName(Partition partition, + ListenerEvent partitionEvent) throws MetaException { + try { + return partitionEvent.getHandler() + .get_table(partition.getDbName(), partition.getTableName()) + .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME); + } catch (NoSuchObjectException e) { + throw new MetaException(e.toString()); + } + } - Partition partition = partitionEvent.getPartition(); - String topicName = getTopicName(partition, partitionEvent); + @Override + public void onAddPartition(AddPartitionEvent partitionEvent) + throws MetaException { + // Subscriber can get notification of newly add partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION" + if (partitionEvent.getStatus()) { + + Partition partition = partitionEvent.getPartition(); + String topicName = getTopicName(partition, partitionEvent); if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT); - } - else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() - + "." + partition.getTableName() + send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT); + } else { + LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + + partition.getDbName() + + "." + + partition.getTableName() + " To enable notifications for this table, please do alter table set properties (" + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + "=.) or whatever you want topic name to be."); } - } + } - } + } - @Override - public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { - // Subscriber can get notification of dropped partition in a - // particular table by listening on a topic named "dbName.tableName" - // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION" + @Override + public void onDropPartition(DropPartitionEvent partitionEvent) + throws MetaException { + // Subscriber can get notification of dropped partition in a + // particular table by listening on a topic named "dbName.tableName" + // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION" - // Datanucleus throws NPE when we try to serialize a partition object - // retrieved from metastore. To workaround that we reset following objects + // Datanucleus throws NPE when we try to serialize a partition object + // retrieved from metastore. To workaround that we reset following objects - if(partitionEvent.getStatus()){ - Partition partition = partitionEvent.getPartition(); - StorageDescriptor sd = partition.getSd(); - sd.setBucketCols(new ArrayList()); - sd.setSortCols(new ArrayList()); - sd.setParameters(new HashMap()); - sd.getSerdeInfo().setParameters(new HashMap()); - String topicName = getTopicName(partition, partitionEvent); + if (partitionEvent.getStatus()) { + Partition partition = partitionEvent.getPartition(); + StorageDescriptor sd = partition.getSd(); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + sd.setParameters(new HashMap()); + sd.getSerdeInfo().setParameters(new HashMap()); + String topicName = getTopicName(partition, partitionEvent); if (topicName != null && !topicName.equals("")) { - send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); - } - else { - LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + partition.getDbName() - + "." + partition.getTableName() + send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT); + } else { + LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for " + + partition.getDbName() + + "." + + partition.getTableName() + " To enable notifications for this table, please do alter table set properties (" + HCatConstants.HCAT_MSGBUS_TOPIC_NAME + "=.) or whatever you want topic name to be."); } - } - } + } + } - @Override - public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { - // Subscriber can get notification about addition of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_ADD_DATABASE" - if(dbEvent.getStatus()) - send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_ADD_DATABASE_EVENT); - } + @Override + public void onCreateDatabase(CreateDatabaseEvent dbEvent) + throws MetaException { + // Subscriber can get notification about addition of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_DATABASE" + if (dbEvent.getStatus()) + send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() + .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT); + } - @Override - public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { - // Subscriber can get notification about drop of a database in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_DROP_DATABASE" - if(dbEvent.getStatus()) - send(dbEvent.getDatabase(),getTopicPrefix(dbEvent.getHandler().getHiveConf()),HCatConstants.HCAT_DROP_DATABASE_EVENT); - } + @Override + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { + // Subscriber can get notification about drop of a database in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_DATABASE" + if (dbEvent.getStatus()) + send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler() + .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT); + } - @Override - public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { - // Subscriber can get notification about addition of a table in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_ADD_TABLE" - if(tableEvent.getStatus()){ - Table tbl = tableEvent.getTable(); - HMSHandler handler = tableEvent.getHandler(); - HiveConf conf = handler.getHiveConf(); - Table newTbl; - try { - newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()).deepCopy(); - newTbl.getParameters().put(HCatConstants.HCAT_MSGBUS_TOPIC_NAME, - getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() - +"." + newTbl.getTableName().toLowerCase()); - handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl); - } catch (InvalidOperationException e) { - MetaException me = new MetaException(e.toString()); - me.initCause(e); - throw me; - } catch (NoSuchObjectException e) { - MetaException me = new MetaException(e.toString()); - me.initCause(e); - throw me; - } - send(newTbl,getTopicPrefix(conf)+ "."+ newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT); - } - } - - private String getTopicPrefix(HiveConf conf){ - return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); - } - - @Override - public void onDropTable(DropTableEvent tableEvent) throws MetaException { - // Subscriber can get notification about drop of a table in HCAT - // by listening on a topic named "HCAT" and message selector string - // as "HCAT_EVENT = HCAT_DROP_TABLE" + @Override + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about addition of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_ADD_TABLE" + if (tableEvent.getStatus()) { + Table tbl = tableEvent.getTable(); + HMSHandler handler = tableEvent.getHandler(); + HiveConf conf = handler.getHiveConf(); + Table newTbl; + try { + newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName()) + .deepCopy(); + newTbl.getParameters().put( + HCatConstants.HCAT_MSGBUS_TOPIC_NAME, + getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "." + + newTbl.getTableName().toLowerCase()); + handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl); + } catch (InvalidOperationException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } catch (NoSuchObjectException e) { + MetaException me = new MetaException(e.toString()); + me.initCause(e); + throw me; + } + send(newTbl, getTopicPrefix(conf) + "." + + newTbl.getDbName().toLowerCase(), + HCatConstants.HCAT_ADD_TABLE_EVENT); + } + } - // Datanucleus throws NPE when we try to serialize a table object - // retrieved from metastore. To workaround that we reset following objects + private String getTopicPrefix(HiveConf conf) { + return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX); + } - if(tableEvent.getStatus()){ - Table table = tableEvent.getTable(); - StorageDescriptor sd = table.getSd(); - sd.setBucketCols(new ArrayList()); - sd.setSortCols(new ArrayList()); - sd.setParameters(new HashMap()); - sd.getSerdeInfo().setParameters(new HashMap()); - send(table,getTopicPrefix(tableEvent.getHandler().getHiveConf())+"."+table.getDbName().toLowerCase(), HCatConstants.HCAT_DROP_TABLE_EVENT); - } - } + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + // Subscriber can get notification about drop of a table in HCAT + // by listening on a topic named "HCAT" and message selector string + // as "HCAT_EVENT = HCAT_DROP_TABLE" - /** - * @param msgBody is the metastore object. It is sent in full such that - * if subscriber is really interested in details, it can reconstruct it fully. - * In case of finalize_partition message this will be string specification of - * the partition. - * @param topicName is the name on message broker on which message is sent. - * @param event is the value of HCAT_EVENT property in message. It can be - * used to select messages in client side. - */ - protected void send(Object msgBody, String topicName, String event){ + // Datanucleus throws NPE when we try to serialize a table object + // retrieved from metastore. To workaround that we reset following objects - try{ + if (tableEvent.getStatus()) { + Table table = tableEvent.getTable(); + StorageDescriptor sd = table.getSd(); + sd.setBucketCols(new ArrayList()); + sd.setSortCols(new ArrayList()); + sd.setParameters(new HashMap()); + sd.getSerdeInfo().setParameters(new HashMap()); + send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + + table.getDbName().toLowerCase(), + HCatConstants.HCAT_DROP_TABLE_EVENT); + } + } - Destination topic = null; - if(null == session){ - // this will happen, if we never able to establish a connection. - createConnection(); - if (null == session){ - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: "+ - topicName + " event: "+event); - return; - } - } - try{ - // Topics are created on demand. If it doesn't exist on broker it will - // be created when broker receives this message. - topic = session.createTopic(topicName); - } catch (IllegalStateException ise){ - // this will happen if we were able to establish connection once, but its no longer valid, - // ise is thrown, catch it and retry. - LOG.error("Seems like connection is lost. Retrying", ise); - createConnection(); - topic = session.createTopic(topicName); - } - if (null == topic){ - // Still not successful, return from here. - LOG.error("Invalid session. Failed to send message on topic: "+ - topicName + " event: "+event); - return; - } - MessageProducer producer = session.createProducer(topic); - Message msg; - if (msgBody instanceof Map){ - MapMessage mapMsg = session.createMapMessage(); - Map incomingMap = (Map)msgBody; - for (Entry partCol : incomingMap.entrySet()){ - mapMsg.setString(partCol.getKey(), partCol.getValue()); - } - msg = mapMsg; - } - else { - msg = session.createObjectMessage((Serializable)msgBody); - } + /** + * @param msgBody + * is the metastore object. It is sent in full such that if + * subscriber is really interested in details, it can reconstruct it + * fully. In case of finalize_partition message this will be string + * specification of the partition. + * @param topicName + * is the name on message broker on which message is sent. + * @param event + * is the value of HCAT_EVENT property in message. It can be used to + * select messages in client side. + */ + protected void send(Object msgBody, String topicName, String event) { - msg.setStringProperty(HCatConstants.HCAT_EVENT, event); - producer.send(msg); - // Message must be transacted before we return. - session.commit(); - } catch(Exception e){ - // Gobble up the exception. Message delivery is best effort. - LOG.error("Failed to send message on topic: "+topicName + - " event: "+event , e); - } - } + try { - protected void createConnection(){ + Destination topic = null; + if (null == session) { + // this will happen, if we never able to establish a connection. + createConnection(); + if (null == session) { + // Still not successful, return from here. + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + event); + return; + } + } + try { + // Topics are created on demand. If it doesn't exist on broker it will + // be created when broker receives this message. + topic = session.createTopic(topicName); + } catch (IllegalStateException ise) { + // this will happen if we were able to establish connection once, but + // its no longer valid, + // ise is thrown, catch it and retry. + LOG.error("Seems like connection is lost. Retrying", ise); + createConnection(); + topic = session.createTopic(topicName); + } + if (null == topic) { + // Still not successful, return from here. + LOG.error("Invalid session. Failed to send message on topic: " + + topicName + " event: " + event); + return; + } + MessageProducer producer = session.createProducer(topic); + Message msg; + if (msgBody instanceof Map) { + MapMessage mapMsg = session.createMapMessage(); + Map incomingMap = (Map) msgBody; + for (Entry partCol : incomingMap.entrySet()) { + mapMsg.setString(partCol.getKey(), partCol.getValue()); + } + msg = mapMsg; + } else { + msg = session.createObjectMessage((Serializable) msgBody); + } - Context jndiCntxt; - try { - jndiCntxt = new InitialContext(); - ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory"); - Connection conn = connFac.createConnection(); - conn.start(); - conn.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException jmse) { - LOG.error(jmse); - } - }); - // We want message to be sent when session commits, thus we run in - // transacted mode. - session = conn.createSession(true, Session.SESSION_TRANSACTED); - } catch (NamingException e) { - LOG.error("JNDI error while setting up Message Bus connection. " + - "Please make sure file named 'jndi.properties' is in " + - "classpath and contains appropriate key-value pairs.",e); - } catch (JMSException e) { - LOG.error("Failed to initialize connection to message bus",e); - } catch(Throwable t){ - LOG.error("Unable to connect to JMS provider",t); - } - } + msg.setStringProperty(HCatConstants.HCAT_EVENT, event); + producer.send(msg); + // Message must be transacted before we return. + session.commit(); + } catch (Exception e) { + // Gobble up the exception. Message delivery is best effort. + LOG.error("Failed to send message on topic: " + topicName + " event: " + + event, e); + } + } - @Override - protected void finalize() throws Throwable { - // Close the connection before dying. - try { - if (null != session) - session.close(); - if(conn != null) { - conn.close(); - } - - } catch (Exception ignore) { - LOG.info("Failed to close message bus connection.", ignore); - } - } + protected void createConnection() { - @Override - public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) - throws MetaException { - if(lpde.getStatus()) - send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT); - } - - @Override - public void onAlterPartition(AlterPartitionEvent ape) throws MetaException{ - //no-op - } - - @Override - public void onAlterTable(AlterTableEvent ate) throws MetaException { - // no-op - } + Context jndiCntxt; + try { + jndiCntxt = new InitialContext(); + ConnectionFactory connFac = (ConnectionFactory) jndiCntxt + .lookup("ConnectionFactory"); + Connection conn = connFac.createConnection(); + conn.start(); + conn.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException jmse) { + LOG.error(jmse); + } + }); + // We want message to be sent when session commits, thus we run in + // transacted mode. + session = conn.createSession(true, Session.SESSION_TRANSACTED); + } catch (NamingException e) { + LOG.error("JNDI error while setting up Message Bus connection. " + + "Please make sure file named 'jndi.properties' is in " + + "classpath and contains appropriate key-value pairs.", e); + } catch (JMSException e) { + LOG.error("Failed to initialize connection to message bus", e); + } catch (Throwable t) { + LOG.error("Unable to connect to JMS provider", t); + } + } + + @Override + protected void finalize() throws Throwable { + // Close the connection before dying. + try { + if (null != session) + session.close(); + if (conn != null) { + conn.close(); + } + + } catch (Exception ignore) { + LOG.info("Failed to close message bus connection.", ignore); + } + } + + @Override + public void onLoadPartitionDone(LoadPartitionDoneEvent lpde) + throws MetaException { + if (lpde.getStatus()) + send( + lpde.getPartitionName(), + lpde.getTable().getParameters() + .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME), + HCatConstants.HCAT_PARTITION_DONE_EVENT); + } + + @Override + public void onAlterPartition(AlterPartitionEvent ape) throws MetaException { + // no-op + } + + @Override + public void onAlterTable(AlterTableEvent ate) throws MetaException { + // no-op + } } Index: src/java/org/apache/hcatalog/cli/HCatCli.java =================================================================== --- src/java/org/apache/hcatalog/cli/HCatCli.java (revision 1352734) +++ src/java/org/apache/hcatalog/cli/HCatCli.java (working copy) @@ -56,11 +56,11 @@ @SuppressWarnings("static-access") public static void main(String[] args) { - try { - LogUtils.initHiveLog4j(); - } catch (LogInitializationException e) { + try { + LogUtils.initHiveLog4j(); + } catch (LogInitializationException e) { - } + } CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class)); ss.in = System.in; @@ -270,7 +270,7 @@ ss.err.println("Failed with exception " + e.getClass().getName() + ":" + e.getMessage() + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); ret = 1; - } + } int cret = driver.close(); if (ret == 0) { Index: src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java =================================================================== --- src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java (revision 1352734) +++ src/java/org/apache/hcatalog/security/StorageDelegationAuthorizationProvider.java (working copy) @@ -91,7 +91,7 @@ //else we do not have anything to delegate to throw new HiveException(String.format("Storage Handler for table:%s is not an instance " + - "of HCatStorageHandler", table.getTableName())); + "of HCatStorageHandler", table.getTableName())); } } else { //return an authorizer for HDFS Index: src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java =================================================================== --- src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java (working copy) @@ -153,7 +153,7 @@ case STRING: return Type.STRING; case BINARY: - return Type.BINARY; + return Type.BINARY; default: throw new TypeNotPresentException(((PrimitiveTypeInfo)basePrimitiveTypeInfo).getTypeName(), null); } Index: src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/state/DefaultStateProvider.java (working copy) @@ -21,26 +21,27 @@ import java.text.NumberFormat; import java.util.Random; - public class DefaultStateProvider implements StateProvider { - /** Default implementation. Here, ids are generated randomly. - */ - @Override - public int getId() { - - NumberFormat numberFormat = NumberFormat.getInstance(); - numberFormat.setMinimumIntegerDigits(5); - numberFormat.setGroupingUsed(false); - return Integer.parseInt(numberFormat.format(Math.abs(new Random().nextInt()))); - } + /** + * Default implementation. Here, ids are generated randomly. + */ + @Override + public int getId() { - private static StateProvider sp; - - public static synchronized StateProvider get() { - if (null == sp) { - sp = new DefaultStateProvider(); - } - return sp; - } + NumberFormat numberFormat = NumberFormat.getInstance(); + numberFormat.setMinimumIntegerDigits(5); + numberFormat.setGroupingUsed(false); + return Integer + .parseInt(numberFormat.format(Math.abs(new Random().nextInt()))); + } + + private static StateProvider sp; + + public static synchronized StateProvider get() { + if (null == sp) { + sp = new DefaultStateProvider(); + } + return sp; + } } Index: src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/state/StateProvider.java (working copy) @@ -21,14 +21,17 @@ import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapred.TaskTracker; -/** If external system wants to communicate any state to slaves, they can do so via this interface. - * One example of this in case of Map-Reduce is ids assigned by {@link JobTracker} to - * {@link TaskTracker} +/** + * If external system wants to communicate any state to slaves, they can do so + * via this interface. One example of this in case of Map-Reduce is ids assigned + * by {@link JobTracker} to {@link TaskTracker} */ public interface StateProvider { - /** This method should return id assigned to slave node. - * @return id - */ - public int getId(); + /** + * This method should return id assigned to slave node. + * + * @return id + */ + public int getId(); } Index: src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/impl/HCatOutputFormatWriter.java (working copy) @@ -42,112 +42,121 @@ import org.apache.hcatalog.mapreduce.HCatOutputFormat; import org.apache.hcatalog.mapreduce.OutputJobInfo; -/** This writer writes via {@link HCatOutputFormat} +/** + * This writer writes via {@link HCatOutputFormat} * */ public class HCatOutputFormatWriter extends HCatWriter { - public HCatOutputFormatWriter(WriteEntity we, Map config) { - super(we, config); - } + public HCatOutputFormatWriter(WriteEntity we, Map config) { + super(we, config); + } - public HCatOutputFormatWriter(Configuration config, StateProvider sp) { - super(config, sp); - } + public HCatOutputFormatWriter(Configuration config, StateProvider sp) { + super(config, sp); + } - @Override - public WriterContext prepareWrite() throws HCatException { - OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(), we.getTableName(), we.getPartitionKVs()); - Job job; - try { - job = new Job(conf); - HCatOutputFormat.setOutput(job, jobInfo); - HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job)); - HCatOutputFormat outFormat = new HCatOutputFormat(); - outFormat.checkOutputSpecs(job); - outFormat.getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())).setupJob(job); - } catch (IOException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } catch (InterruptedException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } - WriterContext cntxt = new WriterContext(); - cntxt.setConf(job.getConfiguration()); - return cntxt; - } + @Override + public WriterContext prepareWrite() throws HCatException { + OutputJobInfo jobInfo = OutputJobInfo.create(we.getDbName(), + we.getTableName(), we.getPartitionKVs()); + Job job; + try { + job = new Job(conf); + HCatOutputFormat.setOutput(job, jobInfo); + HCatOutputFormat.setSchema(job, HCatOutputFormat.getTableSchema(job)); + HCatOutputFormat outFormat = new HCatOutputFormat(); + outFormat.checkOutputSpecs(job); + outFormat.getOutputCommitter( + new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())) + .setupJob(job); + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } + WriterContext cntxt = new WriterContext(); + cntxt.setConf(job.getConfiguration()); + return cntxt; + } - @Override - public void write(Iterator recordItr) throws HCatException { - - int id = sp.getId(); - setVarsInConf(id); - HCatOutputFormat outFormat = new HCatOutputFormat(); - TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID(new TaskID(), id)); - OutputCommitter committer = null; - RecordWriter, HCatRecord> writer; - try { - committer = outFormat.getOutputCommitter(cntxt); - committer.setupTask(cntxt); - writer = outFormat.getRecordWriter(cntxt); - while(recordItr.hasNext()){ - HCatRecord rec = recordItr.next(); - writer.write(null, rec); - } - writer.close(cntxt); - if(committer.needsTaskCommit(cntxt)){ - committer.commitTask(cntxt); - } - } catch (IOException e) { - if(null != committer) { - try { - committer.abortTask(cntxt); - } catch (IOException e1) { - throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1); - } - } - throw new HCatException("Failed while writing",e); - } catch (InterruptedException e) { - if(null != committer) { - try { - committer.abortTask(cntxt); - } catch (IOException e1) { - throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1); - } - } - throw new HCatException("Failed while writing", e); - } - } + @Override + public void write(Iterator recordItr) throws HCatException { - @Override - public void commit(WriterContext context) throws HCatException { - try { - new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID())) - .commitJob(new JobContext(context.getConf(), null)); - } catch (IOException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } catch (InterruptedException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } - } + int id = sp.getId(); + setVarsInConf(id); + HCatOutputFormat outFormat = new HCatOutputFormat(); + TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID( + new TaskID(), id)); + OutputCommitter committer = null; + RecordWriter, HCatRecord> writer; + try { + committer = outFormat.getOutputCommitter(cntxt); + committer.setupTask(cntxt); + writer = outFormat.getRecordWriter(cntxt); + while (recordItr.hasNext()) { + HCatRecord rec = recordItr.next(); + writer.write(null, rec); + } + writer.close(cntxt); + if (committer.needsTaskCommit(cntxt)) { + committer.commitTask(cntxt); + } + } catch (IOException e) { + if (null != committer) { + try { + committer.abortTask(cntxt); + } catch (IOException e1) { + throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1); + } + } + throw new HCatException("Failed while writing", e); + } catch (InterruptedException e) { + if (null != committer) { + try { + committer.abortTask(cntxt); + } catch (IOException e1) { + throw new HCatException(ErrorType.ERROR_INTERNAL_EXCEPTION, e1); + } + } + throw new HCatException("Failed while writing", e); + } + } - @Override - public void abort(WriterContext context) throws HCatException { - try { - new HCatOutputFormat().getOutputCommitter(new TaskAttemptContext(context.getConf(), new TaskAttemptID())) - .abortJob(new JobContext(context.getConf(), null),State.FAILED); - } catch (IOException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } catch (InterruptedException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } - } - - private void setVarsInConf(int id) { - - // Following two config keys are required by FileOutputFormat to work correctly. - // In usual case of Hadoop, JobTracker will set these before launching tasks. - // Since there is no jobtracker here, we set it ourself. - conf.setInt("mapred.task.partition", id); - conf.set("mapred.task.id", "attempt__0000_r_000000_"+id); - } + @Override + public void commit(WriterContext context) throws HCatException { + try { + new HCatOutputFormat().getOutputCommitter( + new TaskAttemptContext(context.getConf(), new TaskAttemptID())) + .commitJob(new JobContext(context.getConf(), null)); + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } + } + + @Override + public void abort(WriterContext context) throws HCatException { + try { + new HCatOutputFormat().getOutputCommitter( + new TaskAttemptContext(context.getConf(), new TaskAttemptID())) + .abortJob(new JobContext(context.getConf(), null), State.FAILED); + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } + } + + private void setVarsInConf(int id) { + + // Following two config keys are required by FileOutputFormat to work + // correctly. + // In usual case of Hadoop, JobTracker will set these before launching + // tasks. + // Since there is no jobtracker here, we set it ourself. + conf.setInt("mapred.task.partition", id); + conf.set("mapred.task.id", "attempt__0000_r_000000_" + id); + } } Index: src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/impl/HCatInputFormatReader.java (working copy) @@ -40,98 +40,102 @@ import org.apache.hcatalog.mapreduce.HCatInputFormat; import org.apache.hcatalog.mapreduce.InputJobInfo; -/** This reader reads via {@link HCatInputFormat} +/** + * This reader reads via {@link HCatInputFormat} * */ -public class HCatInputFormatReader extends HCatReader{ +public class HCatInputFormatReader extends HCatReader { - private InputSplit split; - - public HCatInputFormatReader(InputSplit split, Configuration config, StateProvider sp) { - super(config, sp); - this.split = split; - } + private InputSplit split; - public HCatInputFormatReader(ReadEntity info, Map config) { - super(info,config); - } + public HCatInputFormatReader(InputSplit split, Configuration config, + StateProvider sp) { + super(config, sp); + this.split = split; + } - @Override - public ReaderContext prepareRead() throws HCatException { + public HCatInputFormatReader(ReadEntity info, Map config) { + super(info, config); + } - try { - Job job = new Job(conf); - InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(), re.getTableName(), re.getFilterString()); - HCatInputFormat.setInput(job, jobInfo); - HCatInputFormat hcif = new HCatInputFormat(); - ReaderContext cntxt = new ReaderContext(); - cntxt.setInputSplits(hcif.getSplits(new JobContext(job.getConfiguration(), null))); - cntxt.setConf(job.getConfiguration()); - return cntxt; - } catch (IOException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } catch (InterruptedException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED,e); - } - } + @Override + public ReaderContext prepareRead() throws HCatException { - @Override - public Iterator read() throws HCatException { + try { + Job job = new Job(conf); + InputJobInfo jobInfo = InputJobInfo.create(re.getDbName(), + re.getTableName(), re.getFilterString()); + HCatInputFormat.setInput(job, jobInfo); + HCatInputFormat hcif = new HCatInputFormat(); + ReaderContext cntxt = new ReaderContext(); + cntxt.setInputSplits(hcif.getSplits(new JobContext( + job.getConfiguration(), null))); + cntxt.setConf(job.getConfiguration()); + return cntxt; + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } + } - HCatInputFormat inpFmt = new HCatInputFormat(); - RecordReader rr; - try { - TaskAttemptContext cntxt = new TaskAttemptContext(conf, new TaskAttemptID()); - rr = inpFmt.createRecordReader(split, cntxt); - rr.initialize(split, cntxt); - } catch (IOException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } catch (InterruptedException e) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); - } - return new HCatRecordItr(rr); - } + @Override + public Iterator read() throws HCatException { + HCatInputFormat inpFmt = new HCatInputFormat(); + RecordReader rr; + try { + TaskAttemptContext cntxt = new TaskAttemptContext(conf, + new TaskAttemptID()); + rr = inpFmt.createRecordReader(split, cntxt); + rr.initialize(split, cntxt); + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } catch (InterruptedException e) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED, e); + } + return new HCatRecordItr(rr); + } - private static class HCatRecordItr implements Iterator{ + private static class HCatRecordItr implements Iterator { - private RecordReader curRecReader; + private RecordReader curRecReader; - HCatRecordItr(RecordReader rr) { - curRecReader = rr; - } + HCatRecordItr(RecordReader rr) { + curRecReader = rr; + } - @Override - public boolean hasNext(){ - try { - boolean retVal = curRecReader.nextKeyValue(); - if (retVal) { - return true; - } - // if its false, we need to close recordReader. - curRecReader.close(); - return false; - } catch (IOException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + @Override + public boolean hasNext() { + try { + boolean retVal = curRecReader.nextKeyValue(); + if (retVal) { + return true; + } + // if its false, we need to close recordReader. + curRecReader.close(); + return false; + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } - @Override - public HCatRecord next() { - try { - return curRecReader.getCurrentValue(); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + @Override + public HCatRecord next() { + try { + return curRecReader.getCurrentValue(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } - @Override - public void remove() { - throw new UnsupportedOperationException("Not allowed"); - } - } + @Override + public void remove() { + throw new UnsupportedOperationException("Not allowed"); + } + } } Index: src/java/org/apache/hcatalog/data/transfer/WriterContext.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/WriterContext.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/WriterContext.java (working copy) @@ -26,38 +26,39 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -/** This contains information obtained at master node to help prepare slave nodes for writer. - * This class implements {@link Externalizable} so it can be serialized using - * standard java mechanisms. Master should serialize it and make it available to slaves to - * prepare for writes. +/** + * This contains information obtained at master node to help prepare slave nodes + * for writer. This class implements {@link Externalizable} so it can be + * serialized using standard java mechanisms. Master should serialize it and + * make it available to slaves to prepare for writes. */ -public class WriterContext implements Externalizable, Configurable{ +public class WriterContext implements Externalizable, Configurable { - private static final long serialVersionUID = -5899374262971611840L; - private Configuration conf; + private static final long serialVersionUID = -5899374262971611840L; + private Configuration conf; - public WriterContext() { - conf = new Configuration(); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(final Configuration config) { - this.conf = config; - } + public WriterContext() { + conf = new Configuration(); + } - @Override - public void writeExternal(ObjectOutput out) throws IOException { - conf.write(out); - } + @Override + public Configuration getConf() { + return conf; + } - @Override - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - conf.readFields(in); - } + @Override + public void setConf(final Configuration config) { + this.conf = config; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + conf.write(out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, + ClassNotFoundException { + conf.readFields(in); + } } Index: src/java/org/apache/hcatalog/data/transfer/HCatReader.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/HCatReader.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/HCatReader.java (working copy) @@ -27,65 +27,75 @@ import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.transfer.state.StateProvider; -/** This abstract class is internal to HCatalog and abstracts away the notion of - * underlying system from which reads will be done. +/** + * This abstract class is internal to HCatalog and abstracts away the notion of + * underlying system from which reads will be done. */ -public abstract class HCatReader{ +public abstract class HCatReader { - /** This should be called at master node to obtain {@link ReaderContext} which then should be - * serialized and sent to slave nodes. - * @return {@link ReaderContext} - * @throws HCatException - */ - public abstract ReaderContext prepareRead() throws HCatException; - - /** This should be called at slave nodes to read {@link HCatRecord}s - * @return {@link Iterator} of {@link HCatRecord} - * @throws HCatException - */ - public abstract Iterator read() throws HCatException; - - /** This constructor will be invoked by {@link DataTransferFactory} at master node. - * Don't use this constructor. Instead, use {@link DataTransferFactory} - * @param re - * @param config - */ - protected HCatReader(final ReadEntity re, final Map config) { - this(config); - this.re = re; - } + /** + * This should be called at master node to obtain {@link ReaderContext} which + * then should be serialized and sent to slave nodes. + * + * @return {@link ReaderContext} + * @throws HCatException + */ + public abstract ReaderContext prepareRead() throws HCatException; - /** This constructor will be invoked by {@link DataTransferFactory} at slave nodes. - * Don't use this constructor. Instead, use {@link DataTransferFactory} - * @param config - * @param sp - */ - - protected HCatReader(final Configuration config, StateProvider sp) { - this.conf = config; - this.sp = sp; - } - - protected ReadEntity re; // This will be null at slaves. - protected Configuration conf; - protected ReaderContext info; - protected StateProvider sp; // This will be null at master. - - private HCatReader(final Map config) { - Configuration conf = new Configuration(); - if (null != config) { - for(Entry kv : config.entrySet()){ - conf.set(kv.getKey(), kv.getValue()); - } - } - this.conf = conf; - } - - public Configuration getConf() { - if (null == conf) { - throw new IllegalStateException("HCatReader is not constructed correctly."); - } - return conf; - } + /** + * This should be called at slave nodes to read {@link HCatRecord}s + * + * @return {@link Iterator} of {@link HCatRecord} + * @throws HCatException + */ + public abstract Iterator read() throws HCatException; + + /** + * This constructor will be invoked by {@link DataTransferFactory} at master + * node. Don't use this constructor. Instead, use {@link DataTransferFactory} + * + * @param re + * @param config + */ + protected HCatReader(final ReadEntity re, final Map config) { + this(config); + this.re = re; + } + + /** + * This constructor will be invoked by {@link DataTransferFactory} at slave + * nodes. Don't use this constructor. Instead, use {@link DataTransferFactory} + * + * @param config + * @param sp + */ + + protected HCatReader(final Configuration config, StateProvider sp) { + this.conf = config; + this.sp = sp; + } + + protected ReadEntity re; // This will be null at slaves. + protected Configuration conf; + protected ReaderContext info; + protected StateProvider sp; // This will be null at master. + + private HCatReader(final Map config) { + Configuration conf = new Configuration(); + if (null != config) { + for (Entry kv : config.entrySet()) { + conf.set(kv.getKey(), kv.getValue()); + } + } + this.conf = conf; + } + + public Configuration getConf() { + if (null == conf) { + throw new IllegalStateException( + "HCatReader is not constructed correctly."); + } + return conf; + } } Index: src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/DataTransferFactory.java (working copy) @@ -27,74 +27,109 @@ import org.apache.hcatalog.data.transfer.state.DefaultStateProvider; import org.apache.hcatalog.data.transfer.state.StateProvider; -/** Use this factory to get instances of {@link HCatReader} or {@link HCatWriter} at master and slave nodes. +/** + * Use this factory to get instances of {@link HCatReader} or {@link HCatWriter} + * at master and slave nodes. */ public class DataTransferFactory { - /** - * This should be called once from master node to obtain an instance of {@link HCatReader}. - * @param re ReadEntity built using {@link ReadEntity.Builder} - * @param config any configuration which master node wants to pass to HCatalog - * @return {@link HCatReader} - */ - public static HCatReader getHCatReader(final ReadEntity re, final Map config) { - // In future, this may examine ReadEntity and/or config to return appropriate HCatReader - return new HCatInputFormatReader(re, config); - } + /** + * This should be called once from master node to obtain an instance of + * {@link HCatReader}. + * + * @param re + * ReadEntity built using {@link ReadEntity.Builder} + * @param config + * any configuration which master node wants to pass to HCatalog + * @return {@link HCatReader} + */ + public static HCatReader getHCatReader(final ReadEntity re, + final Map config) { + // In future, this may examine ReadEntity and/or config to return + // appropriate HCatReader + return new HCatInputFormatReader(re, config); + } - /** - * This should only be called once from every slave node to obtain an instance of {@link HCatReader}. - * @param split input split obtained at master node - * @param config configuration obtained at master node - * @return {@link HCatReader} - */ - public static HCatReader getHCatReader(final InputSplit split, final Configuration config) { - // In future, this may examine config to return appropriate HCatReader - return getHCatReader(split, config, DefaultStateProvider.get()); - } + /** + * This should only be called once from every slave node to obtain an instance + * of {@link HCatReader}. + * + * @param split + * input split obtained at master node + * @param config + * configuration obtained at master node + * @return {@link HCatReader} + */ + public static HCatReader getHCatReader(final InputSplit split, + final Configuration config) { + // In future, this may examine config to return appropriate HCatReader + return getHCatReader(split, config, DefaultStateProvider.get()); + } - /** - * This should only be called once from every slave node to obtain an instance of {@link HCatReader}. - * This should be called if an external system has some state to provide to HCatalog. - * @param split input split obtained at master node - * @param config configuration obtained at master node - * @param sp {@link StateProvider} - * @return {@link HCatReader} - */ - public static HCatReader getHCatReader(final InputSplit split, final Configuration config, StateProvider sp) { - // In future, this may examine config to return appropriate HCatReader - return new HCatInputFormatReader(split, config, sp); - } - - /** This should be called at master node to obtain an instance of {@link HCatWriter}. - * @param we WriteEntity built using {@link WriteEntity.Builder} - * @param config any configuration which master wants to pass to HCatalog - * @return {@link HCatWriter} - */ - public static HCatWriter getHCatWriter(final WriteEntity we, final Map config) { - // In future, this may examine WriteEntity and/or config to return appropriate HCatWriter - return new HCatOutputFormatWriter(we, config); - } + /** + * This should only be called once from every slave node to obtain an instance + * of {@link HCatReader}. This should be called if an external system has some + * state to provide to HCatalog. + * + * @param split + * input split obtained at master node + * @param config + * configuration obtained at master node + * @param sp + * {@link StateProvider} + * @return {@link HCatReader} + */ + public static HCatReader getHCatReader(final InputSplit split, + final Configuration config, StateProvider sp) { + // In future, this may examine config to return appropriate HCatReader + return new HCatInputFormatReader(split, config, sp); + } - /** This should be called at slave nodes to obtain an instance of {@link HCatWriter}. - * @param cntxt {@link WriterContext} obtained at master node - * @return {@link HCatWriter} - */ - public static HCatWriter getHCatWriter(final WriterContext cntxt) { - // In future, this may examine context to return appropriate HCatWriter - return getHCatWriter(cntxt, DefaultStateProvider.get()); - } - - /** This should be called at slave nodes to obtain an instance of {@link HCatWriter}. - * If an external system has some mechanism for providing state to HCatalog, this constructor - * can be used. - * @param cntxt {@link WriterContext} obtained at master node - * @param sp {@link StateProvider} - * @return {@link HCatWriter} - */ - public static HCatWriter getHCatWriter(final WriterContext cntxt, final StateProvider sp) { - // In future, this may examine context to return appropriate HCatWriter - return new HCatOutputFormatWriter(cntxt.getConf(), sp); - } + /** + * This should be called at master node to obtain an instance of + * {@link HCatWriter}. + * + * @param we + * WriteEntity built using {@link WriteEntity.Builder} + * @param config + * any configuration which master wants to pass to HCatalog + * @return {@link HCatWriter} + */ + public static HCatWriter getHCatWriter(final WriteEntity we, + final Map config) { + // In future, this may examine WriteEntity and/or config to return + // appropriate HCatWriter + return new HCatOutputFormatWriter(we, config); + } + + /** + * This should be called at slave nodes to obtain an instance of + * {@link HCatWriter}. + * + * @param cntxt + * {@link WriterContext} obtained at master node + * @return {@link HCatWriter} + */ + public static HCatWriter getHCatWriter(final WriterContext cntxt) { + // In future, this may examine context to return appropriate HCatWriter + return getHCatWriter(cntxt, DefaultStateProvider.get()); + } + + /** + * This should be called at slave nodes to obtain an instance of + * {@link HCatWriter}. If an external system has some mechanism for providing + * state to HCatalog, this constructor can be used. + * + * @param cntxt + * {@link WriterContext} obtained at master node + * @param sp + * {@link StateProvider} + * @return {@link HCatWriter} + */ + public static HCatWriter getHCatWriter(final WriterContext cntxt, + final StateProvider sp) { + // In future, this may examine context to return appropriate HCatWriter + return new HCatOutputFormatWriter(cntxt.getConf(), sp); + } } Index: src/java/org/apache/hcatalog/data/transfer/EntityBase.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/EntityBase.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/EntityBase.java (working copy) @@ -20,35 +20,40 @@ import java.util.Map; -/** This is a base class for {@link ReadEntity.Builder} / {@link WriteEntity.Builder}. Many fields in them are common, - * so this class contains the common fields. +/** + * This is a base class for + * {@link ReadEntity.Builder} / {@link WriteEntity.Builder}. + * Many fields in them are common, so this class + * contains the common fields. */ abstract class EntityBase { - String region; - String tableName; - String dbName; - Map partitionKVs; + String region; + String tableName; + String dbName; + Map partitionKVs; + /** + * Common methods for {@link ReadEntity} and {@link WriteEntity} + */ + abstract static class Entity extends EntityBase { - /** Common methods for {@link ReadEntity} and {@link WriteEntity} - */ + public String getRegion() { + return region; + } - abstract static class Entity extends EntityBase{ + public String getTableName() { + return tableName; + } - public String getRegion() { - return region; - } - public String getTableName() { - return tableName; - } - public String getDbName() { - return dbName; - } - public Map getPartitionKVs() { - return partitionKVs; - } - } + public String getDbName() { + return dbName; + } + + public Map getPartitionKVs() { + return partitionKVs; + } + } } Index: src/java/org/apache/hcatalog/data/transfer/ReaderContext.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/ReaderContext.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/ReaderContext.java (working copy) @@ -30,57 +30,59 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hcatalog.mapreduce.HCatSplit; -/** This class will contain information of different {@link InputSplit} obtained at master node - * and configuration. This class implements {@link Externalizable} so it can be serialized using - * standard java mechanisms. +/** + * This class will contain information of different {@link InputSplit} obtained + * at master node and configuration. This class implements + * {@link Externalizable} so it can be serialized using standard java + * mechanisms. */ public class ReaderContext implements Externalizable, Configurable { - private static final long serialVersionUID = -2656468331739574367L; - private List splits; - private Configuration conf; + private static final long serialVersionUID = -2656468331739574367L; + private List splits; + private Configuration conf; - public ReaderContext() { - this.splits = new ArrayList(); - this.conf = new Configuration(); - } - - public void setInputSplits(final List splits) { - this.splits = splits; - } - - public List getSplits() { - return splits; - } - - @Override - public Configuration getConf() { - return conf; - } + public ReaderContext() { + this.splits = new ArrayList(); + this.conf = new Configuration(); + } - @Override - public void setConf(final Configuration config) { - conf = config; - } + public void setInputSplits(final List splits) { + this.splits = splits; + } - @Override - public void writeExternal(ObjectOutput out) throws IOException { - conf.write(out); - out.writeInt(splits.size()); - for (InputSplit split : splits) { - ((HCatSplit)split).write(out); - } - } + public List getSplits() { + return splits; + } - @Override - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - conf.readFields(in); - int numOfSplits = in.readInt(); - for (int i=0 ; i < numOfSplits; i++) { - HCatSplit split = new HCatSplit(); - split.readFields(in); - splits.add(split); - } - } + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(final Configuration config) { + conf = config; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + conf.write(out); + out.writeInt(splits.size()); + for (InputSplit split : splits) { + ((HCatSplit) split).write(out); + } + } + + @Override + public void readExternal(ObjectInput in) throws IOException, + ClassNotFoundException { + conf.readFields(in); + int numOfSplits = in.readInt(); + for (int i = 0; i < numOfSplits; i++) { + HCatSplit split = new HCatSplit(); + split.readFields(in); + splits.add(split); + } + } } Index: src/java/org/apache/hcatalog/data/transfer/WriteEntity.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/WriteEntity.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/WriteEntity.java (working copy) @@ -20,52 +20,55 @@ import java.util.Map; -public class WriteEntity extends EntityBase.Entity{ +public class WriteEntity extends EntityBase.Entity { - /** Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to build - * {@link WriteEntity}. - */ + /** + * Don't instantiate {@link WriteEntity} directly. Use, {@link Builder} to + * build {@link WriteEntity}. + */ - private WriteEntity() { - // Not allowed. - } - - private WriteEntity(Builder builder) { - this.region = builder.region; - this.dbName = builder.dbName; - this.tableName = builder.tableName; - this.partitionKVs = builder.partitionKVs; - } - - /** This class should be used to build {@link WriteEntity}. It follows builder pattern, letting you build - * your {@link WriteEntity} with whatever level of detail you want. - * - */ - public static class Builder extends EntityBase{ - - public Builder withRegion(final String region) { - this.region = region; - return this; - } - - public Builder withDatabase(final String dbName) { - this.dbName = dbName; - return this; - } - - public Builder withTable(final String tblName) { - this.tableName = tblName; - return this; - } - - public Builder withPartition(final Map partKVs) { - this.partitionKVs = partKVs; - return this; - } - - public WriteEntity build() { - return new WriteEntity(this); - } - - } + private WriteEntity() { + // Not allowed. + } + + private WriteEntity(Builder builder) { + this.region = builder.region; + this.dbName = builder.dbName; + this.tableName = builder.tableName; + this.partitionKVs = builder.partitionKVs; + } + + /** + * This class should be used to build {@link WriteEntity}. It follows builder + * pattern, letting you build your {@link WriteEntity} with whatever level of + * detail you want. + * + */ + public static class Builder extends EntityBase { + + public Builder withRegion(final String region) { + this.region = region; + return this; + } + + public Builder withDatabase(final String dbName) { + this.dbName = dbName; + return this; + } + + public Builder withTable(final String tblName) { + this.tableName = tblName; + return this; + } + + public Builder withPartition(final Map partKVs) { + this.partitionKVs = partKVs; + return this; + } + + public WriteEntity build() { + return new WriteEntity(this); + } + + } } Index: src/java/org/apache/hcatalog/data/transfer/ReadEntity.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/ReadEntity.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/ReadEntity.java (working copy) @@ -20,66 +20,69 @@ import java.util.Map; -public class ReadEntity extends EntityBase.Entity{ +public class ReadEntity extends EntityBase.Entity { - private String filterString; + private String filterString; - /** Don't instantiate {@link ReadEntity} directly. Use, {@link ReadEntity.Builder} instead. - * - */ - private ReadEntity() { - // Not allowed - } - - private ReadEntity(Builder builder) { + /** + * Don't instantiate {@link ReadEntity} directly. Use, + * {@link ReadEntity.Builder} instead. + * + */ + private ReadEntity() { + // Not allowed + } - this.region = builder.region; - this.dbName = builder.dbName; - this.tableName = builder.tableName; - this.partitionKVs = builder.partitionKVs; - this.filterString = builder.filterString; - } + private ReadEntity(Builder builder) { - public String getFilterString() { - return this.filterString; - } + this.region = builder.region; + this.dbName = builder.dbName; + this.tableName = builder.tableName; + this.partitionKVs = builder.partitionKVs; + this.filterString = builder.filterString; + } - /** This class should be used to build {@link ReadEntity}. It follows builder pattern, letting you build - * your {@link ReadEntity} with whatever level of detail you want. - * - */ - public static class Builder extends EntityBase { + public String getFilterString() { + return this.filterString; + } - private String filterString; + /** + * This class should be used to build {@link ReadEntity}. It follows builder + * pattern, letting you build your {@link ReadEntity} with whatever level of + * detail you want. + * + */ + public static class Builder extends EntityBase { - public Builder withRegion(final String region) { - this.region = region; - return this; - } + private String filterString; + public Builder withRegion(final String region) { + this.region = region; + return this; + } - public Builder withDatabase(final String dbName) { - this.dbName = dbName; - return this; - } + public Builder withDatabase(final String dbName) { + this.dbName = dbName; + return this; + } - public Builder withTable(final String tblName) { - this.tableName = tblName; - return this; - } + public Builder withTable(final String tblName) { + this.tableName = tblName; + return this; + } - public Builder withPartition(final Map partKVs) { - this.partitionKVs = partKVs; - return this; - } + public Builder withPartition(final Map partKVs) { + this.partitionKVs = partKVs; + return this; + } - public Builder withFilter(String filterString) { - this.filterString = filterString; - return this; - } + public Builder withFilter(String filterString) { + this.filterString = filterString; + return this; + } - public ReadEntity build() { - return new ReadEntity(this); - } - } + public ReadEntity build() { + return new ReadEntity(this); + } + } } \ No newline at end of file Index: src/java/org/apache/hcatalog/data/transfer/HCatWriter.java =================================================================== --- src/java/org/apache/hcatalog/data/transfer/HCatWriter.java (revision 1352734) +++ src/java/org/apache/hcatalog/data/transfer/HCatWriter.java (working copy) @@ -27,69 +27,87 @@ import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.transfer.state.StateProvider; -/** This abstraction is internal to HCatalog. This is to facilitate writing to HCatalog from external - * systems. Don't try to instantiate this directly. Instead, use {@link DataTransferFactory} +/** + * This abstraction is internal to HCatalog. This is to facilitate writing to + * HCatalog from external systems. Don't try to instantiate this directly. + * Instead, use {@link DataTransferFactory} */ public abstract class HCatWriter { - protected Configuration conf; - protected WriteEntity we; // This will be null at slave nodes. - protected WriterContext info; - protected StateProvider sp; - - /** External system should invoke this method exactly once from a master node. - * @return {@link WriterContext} This should be serialized and sent to slave nodes to - * construct HCatWriter there. - * @throws HCatException - */ - public abstract WriterContext prepareWrite() throws HCatException; - - /** This method should be used at slave needs to perform writes. - * @param recordItr {@link Iterator} records to be written into HCatalog. - * @throws {@link HCatException} - */ - public abstract void write(final Iterator recordItr) throws HCatException; - - /** This method should be called at master node. Primary purpose of this is to do metadata commit. - * @throws {@link HCatException} - */ - public abstract void commit(final WriterContext context) throws HCatException; - - /** This method should be called at master node. Primary purpose of this is to do cleanups in case - * of failures. - * @throws {@link HCatException} * - */ - public abstract void abort(final WriterContext context) throws HCatException; - - /** - * This constructor will be used at master node - * @param we WriteEntity defines where in storage records should be written to. - * @param config Any configuration which external system wants to communicate to HCatalog - * for performing writes. - */ - protected HCatWriter(final WriteEntity we, final Map config) { - this(config); - this.we = we; - } - - /** This constructor will be used at slave nodes. - * @param config - */ - protected HCatWriter(final Configuration config, final StateProvider sp) { - this.conf = config; - this.sp = sp; - } + protected Configuration conf; + protected WriteEntity we; // This will be null at slave nodes. + protected WriterContext info; + protected StateProvider sp; - private HCatWriter(final Map config) { - Configuration conf = new Configuration(); - if(config != null){ - // user is providing config, so it could be null. - for(Entry kv : config.entrySet()){ - conf.set(kv.getKey(), kv.getValue()); - } - } + /** + * External system should invoke this method exactly once from a master node. + * + * @return {@link WriterContext} This should be serialized and sent to slave + * nodes to construct HCatWriter there. + * @throws HCatException + */ + public abstract WriterContext prepareWrite() throws HCatException; - this.conf = conf; - } + /** + * This method should be used at slave needs to perform writes. + * + * @param recordItr + * {@link Iterator} records to be written into HCatalog. + * @throws {@link HCatException} + */ + public abstract void write(final Iterator recordItr) + throws HCatException; + + /** + * This method should be called at master node. Primary purpose of this is to + * do metadata commit. + * + * @throws {@link HCatException} + */ + public abstract void commit(final WriterContext context) throws HCatException; + + /** + * This method should be called at master node. Primary purpose of this is to + * do cleanups in case of failures. + * + * @throws {@link HCatException} * + */ + public abstract void abort(final WriterContext context) throws HCatException; + + /** + * This constructor will be used at master node + * + * @param we + * WriteEntity defines where in storage records should be written to. + * @param config + * Any configuration which external system wants to communicate to + * HCatalog for performing writes. + */ + protected HCatWriter(final WriteEntity we, final Map config) { + this(config); + this.we = we; + } + + /** + * This constructor will be used at slave nodes. + * + * @param config + */ + protected HCatWriter(final Configuration config, final StateProvider sp) { + this.conf = config; + this.sp = sp; + } + + private HCatWriter(final Map config) { + Configuration conf = new Configuration(); + if (config != null) { + // user is providing config, so it could be null. + for (Entry kv : config.entrySet()) { + conf.set(kv.getKey(), kv.getValue()); + } + } + + this.conf = conf; + } }