diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java index 775c1d6..153e51a 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java @@ -23,22 +23,19 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.hive.hcatalog.messaging.AddPartitionMessage; +import org.apache.hive.hcatalog.messaging.CreateTableMessage; +import org.apache.hive.hcatalog.messaging.MessageDeserializer; +import org.apache.hive.hcatalog.messaging.MessageFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.listener.DbNotificationListener; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.node.ObjectNode; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -57,6 +54,7 @@ private static final Logger LOG = LoggerFactory.getLogger(TestHCatClientNotification.class.getName()); private static HCatClient hCatClient; + private static MessageDeserializer md = null; private int startTime; private long firstEventId; @@ -65,6 +63,7 @@ public static void setupClient() throws Exception { HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, DbNotificationListener.class.getName()); hCatClient = HCatClient.create(conf); + md = MessageFactory.getInstance().getDeserializer(); } @Before @@ -130,12 +129,13 @@ public void createTable() throws Exception { assertEquals("hcatcreatetable", event.getTableName()); // Parse the message field - ObjectNode jsonTree = getJsonTree(event); - assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText()); - assertEquals("default", jsonTree.get("db").asText()); - assertEquals("hcatcreatetable", jsonTree.get("table").asText()); - Table tableObj = JSONMessageFactory.getTableObj(jsonTree); - assertEquals(table.toHiveTable(), tableObj); + CreateTableMessage createTableMessage = md.getCreateTableMessage(event.getMessage()); + assertEquals(dbName, createTableMessage.getDB()); + assertEquals(tableName, createTableMessage.getTable()); + + // fetch the table marked by the message and compare + HCatTable createdTable = hCatClient.getTable(dbName,tableName); + assertTrue(createdTable.diff(table).equals(HCatTable.NO_DIFF)); } // TODO - Currently no way to test alter table, as this interface doesn't support alter table @@ -189,18 +189,20 @@ public void addPartition() throws Exception { assertEquals(tableName, event.getTableName()); // Parse the message field - ObjectNode jsonTree = getJsonTree(event); - assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, jsonTree.get("eventType").asText()); - assertEquals("default", jsonTree.get("db").asText()); - assertEquals("hcataddparttable", jsonTree.get("table").asText()); - List partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree); - HCatPartition hcatPart = new HCatPartition(table, partitionObjList.get(0)); - assertEquals(part.getDatabaseName(), hcatPart.getDatabaseName()); - assertEquals(part.getTableName(), hcatPart.getTableName()); - assertEquals(part.getValues(), hcatPart.getValues()); - assertEquals(part.getColumns(), hcatPart.getColumns()); - assertEquals(part.getPartColumns(), hcatPart.getPartColumns()); - assertEquals(part.getLocation(), hcatPart.getLocation()); + AddPartitionMessage addPartitionMessage = md.getAddPartitionMessage(event.getMessage()); + assertEquals(dbName, addPartitionMessage.getDB()); + assertEquals(tableName, addPartitionMessage.getTable()); + List> ptndescs = addPartitionMessage.getPartitions(); + + // fetch the partition referred to by the message and compare + HCatPartition addedPart = hCatClient.getPartition(dbName, tableName, ptndescs.get(0)); + + assertEquals(part.getDatabaseName(), addedPart.getDatabaseName()); + assertEquals(part.getTableName(), addedPart.getTableName()); + assertEquals(part.getValues(), addedPart.getValues()); + assertEquals(part.getColumns(), addedPart.getColumns()); + assertEquals(part.getPartColumns(), addedPart.getPartColumns()); + assertEquals(part.getLocation(), addedPart.getLocation()); } // TODO - currently no way to test alter partition, as HCatClient doesn't support it. @@ -283,9 +285,4 @@ public boolean accept(NotificationEvent event) { assertEquals(firstEventId + 1, events.get(0).getEventId()); } - private ObjectNode getJsonTree(HCatNotificationEvent event) throws Exception { - JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage()); - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(jsonParser, ObjectNode.class); - } } diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 4eabb24..288e914 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -53,14 +54,25 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateIndexMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropIndexMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; -import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; -import org.codehaus.jackson.node.ArrayNode; -import org.codehaus.jackson.node.ObjectNode; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -79,6 +91,7 @@ private static Map emptyParameters = new HashMap(); private static IMetaStoreClient msClient; private static Driver driver; + private static MessageDeserializer md = null; private int startTime; private long firstEventId; @@ -108,6 +121,7 @@ public static void connectToMetastore() throws Exception { SessionState.start(new CliSessionState(conf)); msClient = new HiveMetaStoreClient(conf); driver = new Driver(conf); + md = MessageFactory.getInstance().getDeserializer(); } @Before @@ -145,9 +159,8 @@ public void createDatabase() throws Exception { assertNull(event.getTableName()); // Parse the message field - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - assertEquals(EventType.CREATE_DATABASE.toString(), jsonTree.get("eventType").asText()); - assertEquals(dbName, jsonTree.get("db").asText()); + CreateDatabaseMessage createDbMsg = md.getCreateDatabaseMessage(event.getMessage()); + assertEquals(dbName, createDbMsg.getDB()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -188,9 +201,8 @@ public void dropDatabase() throws Exception { assertNull(event.getTableName()); // Parse the message field - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - assertEquals(EventType.DROP_DATABASE.toString(), jsonTree.get("eventType").asText()); - assertEquals(dbName, jsonTree.get("db").asText()); + DropDatabaseMessage dropDbMsg = md.getDropDatabaseMessage(event.getMessage()); + assertEquals(dbName, dropDbMsg.getDB()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -237,12 +249,10 @@ public void createTable() throws Exception { assertEquals(tblName, event.getTableName()); // Parse the message field - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - assertEquals(EventType.CREATE_TABLE.toString(), jsonTree.get("eventType").asText()); - assertEquals(defaultDbName, jsonTree.get("db").asText()); - assertEquals(tblName, jsonTree.get("table").asText()); - Table tableObj = JSONMessageFactory.getTableObj(jsonTree); - assertEquals(table, tableObj); + CreateTableMessage createTblMsg = md.getCreateTableMessage(event.getMessage()); + assertEquals(defaultDbName, createTblMsg.getDB()); + assertEquals(tblName, createTblMsg.getTable()); + assertEquals(table, createTblMsg.getTableObj()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -297,8 +307,7 @@ public void alterTable() throws Exception { assertEquals(defaultDbName, event.getDbName()); assertEquals(tblName, event.getTableName()); - AlterTableMessage alterTableMessage = - JSONMessageFactory.getInstance().getDeserializer().getAlterTableMessage(event.getMessage()); + AlterTableMessage alterTableMessage = md.getAlterTableMessage(event.getMessage()); assertEquals(table, alterTableMessage.getTableObjAfter()); // When hive.metastore.transactional.event.listeners is set, @@ -348,10 +357,9 @@ public void dropTable() throws Exception { assertEquals(tblName, event.getTableName()); // Parse the message field - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - assertEquals(EventType.DROP_TABLE.toString(), jsonTree.get("eventType").asText()); - assertEquals(defaultDbName, jsonTree.get("db").asText()); - assertEquals(tblName, jsonTree.get("table").asText()); + DropTableMessage dropTblMsg = md.getDropTableMessage(event.getMessage()); + assertEquals(defaultDbName, dropTblMsg.getDB()); + assertEquals(tblName, dropTblMsg.getTable()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -411,12 +419,12 @@ public void addPartition() throws Exception { assertEquals(tblName, event.getTableName()); // Parse the message field - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - assertEquals(EventType.ADD_PARTITION.toString(), jsonTree.get("eventType").asText()); - assertEquals(defaultDbName, jsonTree.get("db").asText()); - assertEquals(tblName, jsonTree.get("table").asText()); - List partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree); - assertEquals(partition, partitionObjList.get(0)); + AddPartitionMessage addPtnMsg = md.getAddPartitionMessage(event.getMessage()); + assertEquals(defaultDbName, addPtnMsg.getDB()); + assertEquals(tblName, addPtnMsg.getTable()); + Iterator ptnIter = addPtnMsg.getPartitionObjs().iterator(); + assertTrue(ptnIter.hasNext()); + assertEquals(partition, ptnIter.next()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -479,14 +487,10 @@ public void alterPartition() throws Exception { assertEquals(tblName, event.getTableName()); // Parse the message field - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - assertEquals(EventType.ALTER_PARTITION.toString(), jsonTree.get("eventType").asText()); - assertEquals(defaultDbName, jsonTree.get("db").asText()); - assertEquals(tblName, jsonTree.get("table").asText()); - - AlterPartitionMessage alterPartitionMessage = - JSONMessageFactory.getInstance().getDeserializer().getAlterPartitionMessage(event.getMessage()); - assertEquals(newPart, alterPartitionMessage.getPtnObjAfter()); + AlterPartitionMessage alterPtnMsg = md.getAlterPartitionMessage(event.getMessage()); + assertEquals(defaultDbName, alterPtnMsg.getDB()); + assertEquals(tblName, alterPtnMsg.getTable()); + assertEquals(newPart, alterPtnMsg.getPtnObjAfter()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -543,10 +547,9 @@ public void dropPartition() throws Exception { assertEquals(tblName, event.getTableName()); // Parse the message field - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - assertEquals(EventType.DROP_PARTITION.toString(), jsonTree.get("eventType").asText()); - assertEquals(defaultDbName, jsonTree.get("db").asText()); - assertEquals(tblName, jsonTree.get("table").asText()); + DropPartitionMessage dropPtnMsg = md.getDropPartitionMessage(event.getMessage()); + assertEquals(defaultDbName, dropPtnMsg.getDB()); + assertEquals(tblName, dropPtnMsg.getTable()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -592,7 +595,9 @@ public void createFunction() throws Exception { assertEquals(defaultDbName, event.getDbName()); // Parse the message field - Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event)); + CreateFunctionMessage createFuncMsg = md.getCreateFunctionMessage(event.getMessage()); + assertEquals(defaultDbName, createFuncMsg.getDB()); + Function funcObj = createFuncMsg.getFunctionObj(); assertEquals(defaultDbName, funcObj.getDbName()); assertEquals(funcName, funcObj.getFunctionName()); assertEquals(funcClass, funcObj.getClassName()); @@ -647,15 +652,9 @@ public void dropFunction() throws Exception { assertEquals(defaultDbName, event.getDbName()); // Parse the message field - Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event)); - assertEquals(defaultDbName, funcObj.getDbName()); - assertEquals(funcName, funcObj.getFunctionName()); - assertEquals(funcClass, funcObj.getClassName()); - assertEquals(ownerName, funcObj.getOwnerName()); - assertEquals(FunctionType.JAVA, funcObj.getFunctionType()); - assertEquals(1, funcObj.getResourceUrisSize()); - assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType()); - assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri()); + DropFunctionMessage dropFuncMsg = md.getDropFunctionMessage(event.getMessage()); + assertEquals(defaultDbName, dropFuncMsg.getDB()); + assertEquals(funcName, dropFuncMsg.getFunctionName()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -714,7 +713,9 @@ public void createIndex() throws Exception { assertEquals(dbName, event.getDbName()); // Parse the message field - Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event)); + CreateIndexMessage createIdxMessage = md.getCreateIndexMessage(event.getMessage()); + assertEquals(dbName, createIdxMessage.getDB()); + Index indexObj = createIdxMessage.getIndexObj(); assertEquals(dbName, indexObj.getDbName()); assertEquals(indexName, indexObj.getIndexName()); assertEquals(tableName, indexObj.getOrigTableName()); @@ -780,11 +781,11 @@ public void dropIndex() throws Exception { assertEquals(dbName, event.getDbName()); // Parse the message field - Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event)); - assertEquals(dbName, indexObj.getDbName()); - assertEquals(indexName.toLowerCase(), indexObj.getIndexName()); - assertEquals(tableName.toLowerCase(), indexObj.getOrigTableName()); - assertEquals(indexTableName.toLowerCase(), indexObj.getIndexTableName()); + DropIndexMessage dropIdxMsg = md.getDropIndexMessage(event.getMessage()); + assertEquals(dbName, dropIdxMsg.getDB()); + assertEquals(indexName.toLowerCase(), dropIdxMsg.getIndexName()); + assertEquals(indexTableName.toLowerCase(), dropIdxMsg.getIndexTableName()); + assertEquals(tableName.toLowerCase(), dropIdxMsg.getOrigTableName()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -852,8 +853,8 @@ public void alterIndex() throws Exception { assertEquals(dbName, event.getDbName()); // Parse the message field - Index indexObj = - JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event), "afterIndexObjJson"); + AlterIndexMessage alterIdxMsg = md.getAlterIndexMessage(event.getMessage()); + Index indexObj = alterIdxMsg.getIndexObjAfter(); assertEquals(dbName, indexObj.getDbName()); assertEquals(indexName, indexObj.getIndexName()); assertEquals(tableName, indexObj.getOrigTableName()); @@ -913,7 +914,7 @@ public void insertTable() throws Exception { assertEquals(defaultDbName, event.getDbName()); assertEquals(tblName, event.getTableName()); // Parse the message field - verifyInsertJSON(event, defaultDbName, tblName); + verifyInsert(event, defaultDbName, tblName); } @Test @@ -967,14 +968,14 @@ public void insertPartition() throws Exception { assertEquals(defaultDbName, event.getDbName()); assertEquals(tblName, event.getTableName()); // Parse the message field - verifyInsertJSON(event, defaultDbName, tblName); - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - LinkedHashMap partKeyValsFromNotif = - JSONMessageFactory.getAsMap((ObjectNode) jsonTree.get("partKeyVals"), - new LinkedHashMap()); - assertEquals(partKeyVals, partKeyValsFromNotif); + verifyInsert(event, defaultDbName, tblName); + InsertMessage insertMessage = md.getInsertMessage(event.getMessage()); + Map partKeyValsFromNotif = insertMessage.getPartitionKeyValues(); + + assertMapEquals(partKeyVals, partKeyValsFromNotif); } + @Test public void getOnlyMaxEvents() throws Exception { Database db = new Database("db1", "no description", "file:/tmp", emptyParameters); @@ -1057,7 +1058,7 @@ public void sqlInsertTable() throws Exception { assertEquals(firstEventId + 3, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, defaultDbName, tblName); + verifyInsert(event, defaultDbName, tblName); event = rsp.getEvents().get(4); assertEquals(firstEventId + 5, event.getEventId()); @@ -1090,7 +1091,7 @@ public void sqlCTAS() throws Exception { assertEquals(firstEventId + 3, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, sourceTblName); + verifyInsert(event, null, sourceTblName); event = rsp.getEvents().get(4); assertEquals(firstEventId + 5, event.getEventId()); @@ -1165,13 +1166,13 @@ public void sqlInsertPartition() throws Exception { assertEquals(firstEventId + 4, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, tblName); + verifyInsert(event, null, tblName); event = rsp.getEvents().get(6); assertEquals(firstEventId + 7, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, tblName); + verifyInsert(event, null, tblName); event = rsp.getEvents().get(9); assertEquals(firstEventId + 10, event.getEventId()); @@ -1181,13 +1182,13 @@ public void sqlInsertPartition() throws Exception { assertEquals(firstEventId + 11, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, tblName); + verifyInsert(event, null, tblName); event = rsp.getEvents().get(13); assertEquals(firstEventId + 14, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, tblName); + verifyInsert(event, null, tblName); event = rsp.getEvents().get(16); assertEquals(firstEventId + 17, event.getEventId()); @@ -1223,23 +1224,36 @@ public void sqlInsertPartition() throws Exception { assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); } - private void verifyInsertJSON(NotificationEvent event, String dbName, String tblName) throws Exception { + private void verifyInsert(NotificationEvent event, String dbName, String tblName) throws Exception { // Parse the message field - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - System.out.println("JSONInsertMessage: " + jsonTree.toString()); - assertEquals(EventType.INSERT.toString(), jsonTree.get("eventType").asText()); - if (dbName != null) { - assertEquals(dbName, jsonTree.get("db").asText()); + InsertMessage insertMsg = md.getInsertMessage(event.getMessage()); + System.out.println("InsertMessage: " + insertMsg.toString()); + if (dbName != null ){ + assertEquals(dbName, insertMsg.getDB()); } - if (tblName != null) { - assertEquals(tblName, jsonTree.get("table").asText()); + if (tblName != null){ + assertEquals(tblName, insertMsg.getTable()); } // Should have list of files - List files = - JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("files"), new ArrayList()); + List files = insertMsg.getFiles(); assertTrue(files.size() > 0); } + + private void assertMapEquals(Map map1, Map map2) { + // non ordered, non-classed map comparison - use sparingly instead of assertEquals + // only if you're sure that the order does not matter. + if ((map1 == null) || (map2 == null)){ + assertNull(map1); + assertNull(map2); + } + assertEquals(map1.size(),map2.size()); + for (String k : map1.keySet()){ + assertTrue(map2.containsKey(k)); + assertEquals(map1.get(k), map2.get(k)); + } + } + @Test public void cleanupNotifs() throws Exception { Database db = new Database("cleanup1", "no description", "file:/tmp", emptyParameters); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 6b86080..5bdbfd5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql; -import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; @@ -44,7 +43,6 @@ import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import static junit.framework.Assert.assertTrue; @@ -69,6 +67,13 @@ protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private ArrayList lastResults; + private final boolean VERIFY_SETUP_STEPS = true; + // if verifySetup is set to true, all the test setup we do will perform additional + // verifications as well, which is useful to verify that our setup occurred + // correctly when developing and debugging tests. These verifications, however + // do not test any new functionality for replication, and thus, are not relevant + // for testing replication itself. For steady state, we want this to be false. + @BeforeClass public static void setUpBeforeClass() throws Exception { hconf = new HiveConf(TestReplicationScenarios.class); @@ -130,7 +135,7 @@ private synchronized void advanceDumpDir() { * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned. * Inserts data into one of the ptned tables, and one of the unptned tables, * and verifies that a REPL DUMP followed by a REPL LOAD is able to load it - * appropriately. + * appropriately. This tests bootstrap behaviour primarily. */ @Test public void testBasic() throws IOException { @@ -159,18 +164,13 @@ public void testBasic() throws IOException { createTestDataFile(ptn_locn_2, ptn_data_2); run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - run("SELECT * from " + dbName + ".unptned"); - verifyResults(unptn_data); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)"); - run("SELECT a from " + dbName + ".ptned WHERE b=1"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)"); - run("SELECT a from " + dbName + ".ptned WHERE b=2"); - verifyResults(ptn_data_2); - run("SELECT a from " + dbName + ".ptned_empty"); - verifyResults(empty); - run("SELECT * from " + dbName + ".unptned_empty"); - verifyResults(empty); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); advanceDumpDir(); run("REPL DUMP " + dbName); @@ -183,16 +183,11 @@ public void testBasic() throws IOException { run("REPL STATUS " + dbName + "_dupe"); verifyResults(new String[] {replDumpId}); - run("SELECT * from " + dbName + "_dupe.unptned"); - verifyResults(unptn_data); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); - verifyResults(ptn_data_2); - run("SELECT a from " + dbName + ".ptned_empty"); - verifyResults(empty); - run("SELECT * from " + dbName + ".unptned_empty"); - verifyResults(empty); + verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2); + verifyRun("SELECT a from " + dbName + ".ptned_empty", empty); + verifyRun("SELECT * from " + dbName + ".unptned_empty", empty); } @Test @@ -228,34 +223,29 @@ public void testIncrementalAdds() throws IOException { createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - run("SELECT a from " + dbName + ".ptned_empty"); - verifyResults(empty); - run("SELECT * from " + dbName + ".unptned_empty"); - verifyResults(empty); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); + + // Now, we load data into the tables, and see if an incremental + // repl drop/load can duplicate it. run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - run("SELECT * from " + dbName + ".unptned"); - verifyResults(unptn_data); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * from " + dbName + ".unptned"); - run("SELECT * from " + dbName + ".unptned_late"); - verifyResults(unptn_data); - + verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)"); - run("SELECT a from " + dbName + ".ptned WHERE b=1"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)"); - run("SELECT a from " + dbName + ".ptned WHERE b=2"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2); run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)"); - run("SELECT a from " + dbName + ".ptned_late WHERE b=1"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1",ptn_data_1); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=2)"); - run("SELECT a from " + dbName + ".ptned_late WHERE b=2"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2); + // Perform REPL-DUMP/LOAD advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId ); String incrementalDumpLocn = getResult(0,0); @@ -272,32 +262,22 @@ public void testIncrementalAdds() throws IOException { // incremental dump. Currently, the dump id fetched will be the last dump id at the time // the db was created from the bootstrap export dump - run("SELECT * from " + dbName + "_dupe.unptned_empty"); - verifyResults(empty); - run("SELECT a from " + dbName + ".ptned_empty"); - verifyResults(empty); + // VERIFY tables and partitions on destination for equivalence. + verifyRun("SELECT * from " + dbName + "_dupe.unptned_empty", empty); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_empty", empty); -// run("SELECT * from " + dbName + "_dupe.unptned"); -// verifyResults(unptn_data); +// verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data); // TODO :this does not work because LOAD DATA LOCAL INPATH into an unptned table seems // to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after // fixing that. - run("SELECT * from " + dbName + "_dupe.unptned_late"); - verifyResults(unptn_data); - - - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); - verifyResults(ptn_data_2); + verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data); - // verified up to here. - run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2"); - verifyResults(ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2); } @Test @@ -326,20 +306,18 @@ public void testDrops() throws IOException { createTestDataFile(ptn_locn_2, ptn_data_2); run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - run("SELECT * from " + dbName + ".unptned"); - verifyResults(unptn_data); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')"); - run("SELECT a from " + dbName + ".ptned WHERE b='1'"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')"); - run("SELECT a from " + dbName + ".ptned WHERE b='2'"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')"); - run("SELECT a from " + dbName + ".ptned2 WHERE b='1'"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'", ptn_data_1); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')"); - run("SELECT a from " + dbName + ".ptned2 WHERE b='2'"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2); + + // At this point, we've set up all the tables and ptns we're going to test drops across + // Replicate it first, and then we'll drop it on the source. advanceDumpDir(); run("REPL DUMP " + dbName); @@ -348,28 +326,23 @@ public void testDrops() throws IOException { run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + verifySetup("REPL STATUS " + dbName + "_dupe", new String[] {replDumpId}); - run("REPL STATUS " + dbName + "_dupe"); - verifyResults(new String[] {replDumpId}); + verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data); + verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1); + verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2); + verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1); + verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2); - run("SELECT * from " + dbName + "_dupe.unptned"); - verifyResults(unptn_data); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'"); - verifyResults(ptn_data_2); - run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'"); - verifyResults(ptn_data_2); + // All tables good on destination, drop on source. run("DROP TABLE " + dbName + ".unptned"); run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')"); run("DROP TABLE " + dbName + ".ptned2"); - run("SELECT a from " + dbName + ".ptned WHERE b=2"); - verifyResults(empty); - run("SELECT a from " + dbName + ".ptned"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", empty); + verifySetup("SELECT a from " + dbName + ".ptned", ptn_data_1); + + // replicate the incremental drops advanceDumpDir();; run("REPL DUMP " + dbName + " FROM " + replDumpId); @@ -380,6 +353,10 @@ public void testDrops() throws IOException { printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); + // verify that drops were replicated. This can either be from tables or ptns + // not existing, and thus, throwing a NoSuchObjectException, or returning nulls + // or select * returning empty, depending on what we're testing. + Exception e = null; try { Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned"); @@ -390,10 +367,8 @@ public void testDrops() throws IOException { assertNotNull(e); assertEquals(NoSuchObjectException.class, e.getClass()); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); - verifyResults(empty); - run("SELECT a from " + dbName + "_dupe.ptned"); - verifyResults(ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty); + verifyRun("SELECT a from " + dbName + "_dupe.ptned", ptn_data_1); Exception e2 = null; try { @@ -407,7 +382,6 @@ public void testDrops() throws IOException { } - @Test public void testAlters() throws IOException { @@ -435,24 +409,20 @@ public void testAlters() throws IOException { createTestDataFile(ptn_locn_2, ptn_data_2); run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - run("SELECT * from " + dbName + ".unptned"); - verifyResults(unptn_data); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned2"); - run("SELECT * from " + dbName + ".unptned2"); - verifyResults(unptn_data); + verifySetup("SELECT * from " + dbName + ".unptned2", unptn_data); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')"); - run("SELECT a from " + dbName + ".ptned WHERE b='1'"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')"); - run("SELECT a from " + dbName + ".ptned WHERE b='2'"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')"); - run("SELECT a from " + dbName + ".ptned2 WHERE b='1'"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'",ptn_data_1); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')"); - run("SELECT a from " + dbName + ".ptned2 WHERE b='2'"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2); + + // base tables set up, let's replicate them over advanceDumpDir(); run("REPL DUMP " + dbName); @@ -465,50 +435,53 @@ public void testAlters() throws IOException { run("REPL STATUS " + dbName + "_dupe"); verifyResults(new String[] {replDumpId}); - run("SELECT * from " + dbName + "_dupe.unptned"); - verifyResults(unptn_data); - run("SELECT * from " + dbName + "_dupe.unptned2"); - verifyResults(unptn_data); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'"); - verifyResults(ptn_data_2); - run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'"); - verifyResults(ptn_data_2); + verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data); + verifySetup("SELECT * from " + dbName + "_dupe.unptned2", unptn_data); + verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1); + verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2); + verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1); + verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2); + // tables have been replicated over, and verified to be identical. Now, we do a couple of + // alters on the source + + // Rename unpartitioned table run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_rn"); - run("SELECT * from " + dbName + ".unptned_rn"); - verifyResults(unptn_data); + verifySetup("SELECT * from " + dbName + ".unptned_rn", unptn_data); + // Alter unpartitioned table set table property String testKey = "blah"; String testVal = "foo"; run("ALTER TABLE " + dbName + ".unptned2 SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')"); - try { - Table unptn2 = metaStoreClient.getTable(dbName,"unptned2"); - assertTrue(unptn2.getParameters().containsKey(testKey)); - assertEquals(testVal,unptn2.getParameters().get(testKey)); - } catch (TException e) { - assertNull(e); + if (VERIFY_SETUP_STEPS){ + try { + Table unptn2 = metaStoreClient.getTable(dbName,"unptned2"); + assertTrue(unptn2.getParameters().containsKey(testKey)); + assertEquals(testVal,unptn2.getParameters().get(testKey)); + } catch (TException e) { + assertNull(e); + } } + // alter partitioned table, rename partition run("ALTER TABLE " + dbName + ".ptned PARTITION (b='2') RENAME TO PARTITION (b='22')"); - run("SELECT a from " + dbName + ".ptned WHERE b=2"); - verifyResults(empty); - run("SELECT a from " + dbName + ".ptned WHERE b=22"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", empty); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=22", ptn_data_2); + // alter partitioned table set table property run("ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')"); - try { - Table ptned = metaStoreClient.getTable(dbName,"ptned"); - assertTrue(ptned.getParameters().containsKey(testKey)); - assertEquals(testVal,ptned.getParameters().get(testKey)); - } catch (TException e) { - assertNull(e); + if (VERIFY_SETUP_STEPS){ + try { + Table ptned = metaStoreClient.getTable(dbName,"ptned"); + assertTrue(ptned.getParameters().containsKey(testKey)); + assertEquals(testVal,ptned.getParameters().get(testKey)); + } catch (TException e) { + assertNull(e); + } } - // No DDL way to alter a partition, so we use the MSC api directly. + // alter partitioned table's partition set partition property + // Note : No DDL way to alter a partition, so we use the MSC api directly. try { List ptnVals1 = new ArrayList(); ptnVals1.add("1"); @@ -519,12 +492,12 @@ public void testAlters() throws IOException { assertNull(e); } - - run("SELECT a from " + dbName + ".ptned2 WHERE b=2"); - verifyResults(ptn_data_2); + // rename partitioned table + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2); run("ALTER TABLE " + dbName + ".ptned2 RENAME TO " + dbName + ".ptned2_rn"); - run("SELECT a from " + dbName + ".ptned2_rn WHERE b=2"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned2_rn WHERE b=2", ptn_data_2); + + // All alters done, now we replicate them over. advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId); @@ -535,6 +508,9 @@ public void testAlters() throws IOException { printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'"); + // Replication done, we now do the following verifications: + + // verify that unpartitioned table rename succeeded. Exception e = null; try { Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "unptned"); @@ -544,7 +520,9 @@ public void testAlters() throws IOException { } assertNotNull(e); assertEquals(NoSuchObjectException.class, e.getClass()); + verifyRun("SELECT * from " + dbName + "_dupe.unptned_rn", unptn_data); + // verify that partition rename succeded. try { Table unptn2 = metaStoreClient.getTable(dbName + "_dupe" , "unptned2"); assertTrue(unptn2.getParameters().containsKey(testKey)); @@ -553,13 +531,10 @@ public void testAlters() throws IOException { assertNull(te); } - run("SELECT * from " + dbName + "_dupe.unptned_rn"); - verifyResults(unptn_data); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); - verifyResults(empty); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=22"); - verifyResults(ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=22", ptn_data_2); + // verify that ptned table rename succeded. Exception e2 = null; try { Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "ptned2"); @@ -569,15 +544,18 @@ public void testAlters() throws IOException { } assertNotNull(e2); assertEquals(NoSuchObjectException.class, e.getClass()); + verifyRun("SELECT a from " + dbName + "_dupe.ptned2_rn WHERE b=2", ptn_data_2); + // verify that ptned table property set worked try { Table ptned = metaStoreClient.getTable(dbName + "_dupe" , "ptned"); assertTrue(ptned.getParameters().containsKey(testKey)); - assertEquals(testVal,ptned.getParameters().get(testKey)); + assertEquals(testVal, ptned.getParameters().get(testKey)); } catch (TException te) { assertNull(te); } + // verify that partitioned table partition property set worked. try { List ptnVals1 = new ArrayList(); ptnVals1.add("1"); @@ -588,8 +566,6 @@ public void testAlters() throws IOException { assertNull(te); } - run("SELECT a from " + dbName + "_dupe.ptned2_rn WHERE b=2"); - verifyResults(ptn_data_2); } @Test @@ -626,18 +602,14 @@ public void testIncrementalInserts() throws IOException { createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - run("SELECT a from " + dbName + ".ptned_empty"); - verifyResults(empty); - run("SELECT * from " + dbName + ".unptned_empty"); - verifyResults(empty); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - run("SELECT * from " + dbName + ".unptned"); - verifyResults(unptn_data); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned"); run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned"); - run("SELECT * from " + dbName + ".unptned_late"); - verifyResults(unptn_data); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data); advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId); @@ -647,29 +619,24 @@ public void testIncrementalInserts() throws IOException { run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); - run("SELECT * from " + dbName + "_dupe.unptned_late"); - verifyResults(unptn_data); + verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)"); - run("SELECT a from " + dbName + ".ptned WHERE b=1"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)"); - run("SELECT a from " + dbName + ".ptned WHERE b=2"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2); run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + ".ptned WHERE b=1"); - run("SELECT a from " + dbName + ".ptned_late WHERE b=1"); - verifyResults(ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptn_data_1); run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + ".ptned WHERE b=2"); - run("SELECT a from " + dbName + ".ptned_late WHERE b=2"); - verifyResults(ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2); advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId); @@ -679,12 +646,12 @@ public void testIncrementalInserts() throws IOException { run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); - run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2"); - verifyResults(ptn_data_2); + + verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2); } + private String getResult(int rowNum, int colNum) throws IOException { return getResult(rowNum,colNum,false); } @@ -729,6 +696,18 @@ private void printOutput() throws IOException { } } + private void verifySetup(String cmd, String[] data) throws IOException { + if (VERIFY_SETUP_STEPS){ + run(cmd); + verifyResults(data); + } + } + + private void verifyRun(String cmd, String[] data) throws IOException { + run(cmd); + verifyResults(data); + } + private static void run(String cmd) throws RuntimeException { try { run(cmd,false); // default arg-less run simply runs, and does not care about failure diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java index 0fc7f9e..e72a94b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java @@ -19,8 +19,14 @@ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Index; + public abstract class AlterIndexMessage extends EventMessage { + public abstract Index getIndexObjBefore() throws Exception; + + public abstract Index getIndexObjAfter() throws Exception; + protected AlterIndexMessage() { super(EventType.ALTER_INDEX); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java index 867e8ec..d94a3f0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java @@ -19,9 +19,28 @@ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Function; + public abstract class CreateFunctionMessage extends EventMessage { protected CreateFunctionMessage() { super(EventType.CREATE_FUNCTION); } + + public abstract Function getFunctionObj() throws Exception; + + @Override + public EventMessage checkValid() { + try { + if (getFunctionObj() == null) + throw new IllegalStateException("Function object unset."); + } catch (Exception e) { + if (! (e instanceof IllegalStateException)){ + throw new IllegalStateException("Event not set up correctly", e); + } else { + throw (IllegalStateException) e; + } + } + return super.checkValid(); + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java index 81676aa..3ce0d62 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java @@ -19,9 +19,28 @@ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Index; + public abstract class CreateIndexMessage extends EventMessage { protected CreateIndexMessage() { super(EventType.CREATE_INDEX); } + + public abstract Index getIndexObj() throws Exception; + + @Override + public EventMessage checkValid() { + try { + if (getIndexObj() == null) + throw new IllegalStateException("Function object unset."); + } catch (Exception e) { + if (! (e instanceof IllegalStateException)){ + throw new IllegalStateException("Event not set up correctly", e); + } else { + throw (IllegalStateException) e; + } + } + return super.checkValid(); + } } \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java index 82cdc44..2b45d40 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java @@ -21,7 +21,18 @@ public abstract class DropFunctionMessage extends EventMessage { + public abstract String getFunctionName(); + protected DropFunctionMessage() { super(EventType.DROP_FUNCTION); } + + @Override + public EventMessage checkValid() { + if (getFunctionName() == null){ + throw new IllegalStateException("Function name unset."); + } + return super.checkValid(); + } + } \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java index ce7b760..5997f92 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java @@ -21,7 +21,26 @@ public abstract class DropIndexMessage extends EventMessage { + public abstract String getIndexName(); + public abstract String getOrigTableName(); + public abstract String getIndexTableName(); + protected DropIndexMessage() { super(EventType.DROP_INDEX); } + + @Override + public EventMessage checkValid() { + if (getIndexName() == null){ + throw new IllegalStateException("Index name unset."); + } + if (getOrigTableName() == null){ + throw new IllegalStateException("Index original table name unset."); + } + // NOTE: we do not do a not-null check on getIndexTableName, + // since, per the index design wiki, it can actually be null. + + return super.checkValid(); + } + } \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterIndexMessage.java index aa32908..5dffdd1 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterIndexMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterIndexMessage.java @@ -77,6 +77,16 @@ public String getAfterIndexObjJson() { } @Override + public Index getIndexObjBefore() throws Exception { + return (Index) JSONMessageFactory.getTObj(beforeIndexObjJson, Index.class); + } + + @Override + public Index getIndexObjAfter() throws Exception { + return (Index) JSONMessageFactory.getTObj(afterIndexObjJson, Index.class); + } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java index a88d85f..3646d85 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateFunctionMessage.java @@ -70,6 +70,11 @@ public String getFunctionObjJson() { } @Override + public Function getFunctionObj() throws Exception { + return (Function) JSONMessageFactory.getTObj(functionObjJson,Function.class); + } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateIndexMessage.java index d376c77..a2e2fc0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateIndexMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateIndexMessage.java @@ -71,6 +71,11 @@ public String getIndexObjJson() { } @Override + public Index getIndexObj() throws Exception { + return (Index) JSONMessageFactory.getTObj(indexObjJson, Index.class); + } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropFunctionMessage.java index d994872..b9ee4c4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropFunctionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropFunctionMessage.java @@ -30,7 +30,7 @@ public class JSONDropFunctionMessage extends DropFunctionMessage { @JsonProperty - String server, servicePrincipal, db, functionObjJson; + String server, servicePrincipal, db, functionName; @JsonProperty Long timestamp; @@ -44,12 +44,8 @@ public JSONDropFunctionMessage(String server, String servicePrincipal, Function this.server = server; this.servicePrincipal = servicePrincipal; this.db = fn.getDbName(); + this.functionName = fn.getFunctionName(); this.timestamp = timestamp; - try { - this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn); - } catch (TException ex) { - throw new IllegalArgumentException("Could not serialize Function object", ex); - } checkValid(); } @@ -65,10 +61,6 @@ public JSONDropFunctionMessage(String server, String servicePrincipal, Function @Override public Long getTimestamp() { return timestamp; } - public String getFunctionObjJson() { - return functionObjJson; - } - @Override public String toString() { try { @@ -78,4 +70,10 @@ public String toString() { throw new IllegalArgumentException("Could not serialize: ", exception); } } + + @Override + public String getFunctionName() { + return functionName; + } + } \ No newline at end of file diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropIndexMessage.java index 131d345..f9dd251 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropIndexMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropIndexMessage.java @@ -30,7 +30,7 @@ public class JSONDropIndexMessage extends DropIndexMessage { @JsonProperty - String server, servicePrincipal, db, indexObjJson; + String server, servicePrincipal, db, indexName, origTableName, indexTableName; @JsonProperty Long timestamp; @@ -44,11 +44,9 @@ public JSONDropIndexMessage(String server, String servicePrincipal, Index index, this.server = server; this.servicePrincipal = servicePrincipal; this.db = index.getDbName(); - try { - this.indexObjJson = JSONMessageFactory.createIndexObjJson(index); - } catch (TException ex) { - throw new IllegalArgumentException("Could not serialize Index object", ex); - } + this.indexName = index.getIndexName(); + this.origTableName = index.getOrigTableName(); + this.indexTableName = index.getIndexTableName(); this.timestamp = timestamp; checkValid(); @@ -66,8 +64,16 @@ public JSONDropIndexMessage(String server, String servicePrincipal, Index index, @Override public Long getTimestamp() { return timestamp; } - public String getIndexObjJson() { - return indexObjJson; + public String getIndexName() { + return indexName; + } + + public String getOrigTableName() { + return origTableName; + } + + public String getIndexTableName() { + return indexTableName; } @Override diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 2749371..371ec46 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -313,64 +313,4 @@ public String apply(@Nullable JsonNode input) { return getTObjs(Iterables.transform(jsonArrayIterator, textExtractor), objClass); } - // FIXME : remove all methods below this, and expose them from the individual Messages' impl instead. - // TestDbNotificationListener needs a revamp before we remove these methods though. - - public static List getPartitionObjList(ObjectNode jsonTree) throws Exception { - TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); - List partitionObjList = new ArrayList(); - Partition partitionObj = new Partition(); - Iterator jsonArrayIterator = jsonTree.get("partitionListJson").iterator(); - while (jsonArrayIterator.hasNext()) { - deSerializer.deserialize(partitionObj, jsonArrayIterator.next().asText(), "UTF-8"); - partitionObjList.add(partitionObj); - } - return partitionObjList; - } - - public static Function getFunctionObj(ObjectNode jsonTree) throws Exception { - TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); - Function funcObj = new Function(); - String tableJson = jsonTree.get("functionObjJson").asText(); - deSerializer.deserialize(funcObj, tableJson, "UTF-8"); - return funcObj; - } - - public static Index getIndexObj(ObjectNode jsonTree) throws Exception { - return getIndexObj(jsonTree, "indexObjJson"); - } - - public static Index getIndexObj(ObjectNode jsonTree, String indexObjKey) throws Exception { - TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); - Index indexObj = new Index(); - String tableJson = jsonTree.get(indexObjKey).asText(); - deSerializer.deserialize(indexObj, tableJson, "UTF-8"); - return indexObj; - } - - /** - * Convert a json ArrayNode to an ordered list of Strings - * - * @param arrayNode: the json array node - * @param elements: the list to populate - * @return - */ - public static List getAsList(ArrayNode arrayNode, List elements) { - Iterator arrayNodeIterator = arrayNode.iterator(); - while (arrayNodeIterator.hasNext()) { - JsonNode node = arrayNodeIterator.next(); - elements.add(node.asText()); - } - return elements; - } - - public static LinkedHashMap getAsMap(ObjectNode objectNode, - LinkedHashMap hashMap) { - Iterator> objectNodeIterator = objectNode.getFields(); - while (objectNodeIterator.hasNext()) { - Map.Entry objectAsMap = objectNodeIterator.next(); - hashMap.put(objectAsMap.getKey(), objectAsMap.getValue().asText()); - } - return hashMap; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 85f8c64..f91d6a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -426,9 +426,6 @@ private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { case MessageFactory.ADD_PARTITION_EVENT : { AddPartitionMessage apm = md.getAddPartitionMessage(ev.getMessage()); LOG.info("Processing#{} ADD_PARTITION message : {}", ev.getEventId(), ev.getMessage()); - // FIXME : Current MessageFactory api is lacking, - // and impl is in JSONMessageFactory instead. This needs to be - // refactored correctly so we don't depend on a specific impl. Iterable ptns = apm.getPartitionObjs(); if ((ptns == null) || (!ptns.iterator().hasNext())) { LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions");