diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java index 153e51a..c09e687 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java @@ -19,12 +19,17 @@ package org.apache.hive.hcatalog.api; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; +import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage; import org.apache.hive.hcatalog.messaging.CreateTableMessage; +import org.apache.hive.hcatalog.messaging.DropDatabaseMessage; +import org.apache.hive.hcatalog.messaging.DropPartitionMessage; +import org.apache.hive.hcatalog.messaging.DropTableMessage; import org.apache.hive.hcatalog.messaging.MessageDeserializer; import org.apache.hive.hcatalog.messaging.MessageFactory; import org.slf4j.Logger; @@ -87,8 +92,8 @@ public void createDatabase() throws Exception { assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, event.getEventType()); assertEquals("myhcatdb", event.getDbName()); assertNull(event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"myhcatdb\",\"timestamp\":[0-9]+}")); + CreateDatabaseMessage createDatabaseMessage = md.getCreateDatabaseMessage(event.getMessage()); + assertEquals("myhcatdb", createDatabaseMessage.getDB()); } @Test @@ -106,8 +111,8 @@ public void dropDatabase() throws Exception { assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType()); assertEquals(dbname, event.getDbName()); assertNull(event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"hcatdropdb\",\"timestamp\":[0-9]+}")); + DropDatabaseMessage dropDatabaseMessage = md.getDropDatabaseMessage(event.getMessage()); + assertEquals(dbname, dropDatabaseMessage.getDB()); } @Test @@ -158,9 +163,10 @@ public void dropTable() throws Exception { assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType()); assertEquals(dbName, event.getDbName()); assertEquals(tableName, event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"hcatdroptable\",\"timestamp\":[0-9]+}")); + + DropTableMessage dropTableMessage = md.getDropTableMessage(event.getMessage()); + assertEquals(dbName, dropTableMessage.getDB()); + assertEquals(tableName, dropTableMessage.getTable()); } @Test @@ -232,11 +238,17 @@ public void dropPartition() throws Exception { assertEquals(firstEventId + 3, event.getEventId()); assertTrue(event.getEventTime() >= startTime); assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType()); - assertEquals("default", event.getDbName()); + assertEquals(dbName, event.getDbName()); assertEquals(tableName, event.getTableName()); - assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\"," + - "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" + - "\"hcatdropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}")); + + // Parse the message field + DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(event.getMessage()); + assertEquals(dbName, dropPartitionMessage.getDB()); + assertEquals(tableName, dropPartitionMessage.getTable()); + List> droppedPartSpecs = dropPartitionMessage.getPartitions(); + assertNotNull(droppedPartSpecs); + assertEquals(1,droppedPartSpecs.size()); + assertEquals(partSpec,droppedPartSpecs.get(0)); } @Test diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 288e914..6adab3c 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -550,6 +550,10 @@ public void dropPartition() throws Exception { DropPartitionMessage dropPtnMsg = md.getDropPartitionMessage(event.getMessage()); assertEquals(defaultDbName, dropPtnMsg.getDB()); assertEquals(tblName, dropPtnMsg.getTable()); + Table tableObj = dropPtnMsg.getTableObj(); + assertEquals(table.getDbName(), tableObj.getDbName()); + assertEquals(table.getTableName(), tableObj.getTableName()); + assertEquals(table.getOwner(), tableObj.getOwner()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 5bdbfd5..778c13a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -291,6 +291,7 @@ public void testDrops() throws IOException { run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned3(a string) partitioned by (b int) STORED AS TEXTFILE"); String[] unptn_data = new String[]{ "eleven" , "twelve" }; String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; @@ -315,6 +316,11 @@ public void testDrops() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'", ptn_data_1); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')"); verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=1)"); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=1", ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=2)"); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2); + // At this point, we've set up all the tables and ptns we're going to test drops across // Replicate it first, and then we'll drop it on the source. @@ -333,18 +339,23 @@ public void testDrops() throws IOException { verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2); verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1); verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2); + verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", ptn_data_1); + verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=2", ptn_data_2); // All tables good on destination, drop on source. run("DROP TABLE " + dbName + ".unptned"); run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')"); run("DROP TABLE " + dbName + ".ptned2"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", empty); + run("ALTER TABLE " + dbName + ".ptned3 DROP PARTITION (b=1)"); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", empty); verifySetup("SELECT a from " + dbName + ".ptned", ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned3 WHERE b=1",empty); + verifySetup("SELECT a from " + dbName + ".ptned3",ptn_data_2); // replicate the incremental drops - advanceDumpDir();; + advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId); String postDropReplDumpLocn = getResult(0,0); String postDropReplDumpId = getResult(0,1,true); @@ -367,8 +378,10 @@ public void testDrops() throws IOException { assertNotNull(e); assertEquals(NoSuchObjectException.class, e.getClass()); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", empty); verifyRun("SELECT a from " + dbName + "_dupe.ptned", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", empty); + verifyRun("SELECT a from " + dbName + "_dupe.ptned3", ptn_data_2); Exception e2 = null; try { @@ -379,7 +392,6 @@ public void testDrops() throws IOException { } assertNotNull(e2); assertEquals(NoSuchObjectException.class, e.getClass()); - } @Test diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java index 26aecb3..0dd3e50 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.metastore.api.Table; + public abstract class DropPartitionMessage extends EventMessage { protected DropPartitionMessage() { @@ -29,6 +31,9 @@ protected DropPartitionMessage() { } public abstract String getTable(); + + public abstract Table getTableObj() throws Exception; + public abstract List> getPartitions (); @Override diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java index b8ea224..f1860af 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropPartitionMessage.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.metastore.messaging.json; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; import java.util.List; @@ -31,7 +33,7 @@ public class JSONDropPartitionMessage extends DropPartitionMessage { @JsonProperty - String server, servicePrincipal, db, table; + String server, servicePrincipal, db, table, tableObjJson; @JsonProperty Long timestamp; @@ -56,6 +58,17 @@ public JSONDropPartitionMessage(String server, String servicePrincipal, String d checkValid(); } + public JSONDropPartitionMessage(String server, String servicePrincipal, Table tableObj, + List> partitionKeyValues, long timestamp) { + this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), + partitionKeyValues, timestamp); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + @Override public String getServer() { return server; @@ -87,6 +100,15 @@ public Long getTimestamp() { } @Override + public Table getTableObj() throws Exception { + return (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class); + } + + public String getTableObjJson() { + return tableObjJson; + } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 371ec46..f66a2a3 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -138,8 +138,8 @@ public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition b @Override public DropPartitionMessage buildDropPartitionMessage(Table table, Iterator partitionsIterator) { - return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table.getDbName(), - table.getTableName(), getPartitionKeyValues(table, partitionsIterator), now()); + return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, + getPartitionKeyValues(table, partitionsIterator), now()); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index f91d6a7..98cd3b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.IOUtils; + import javax.annotation.Nullable; import java.io.BufferedReader; @@ -890,23 +891,38 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { return tasks; } case EVENT_DROP_PARTITION: { - DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload()); - Map> partSpecs = genPartSpecs(dropPartitionMessage.getPartitions()); - if (partSpecs.size() > 0){ - DropTableDesc dropPtnDesc = new DropTableDesc( - dbName + "." + (tblName == null ? dropPartitionMessage.getTable() : tblName), partSpecs, - null, true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); - Task dropPtnTask = TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf); - if (precursor != null){ - precursor.addDependentTask(dropPtnTask); + try { + DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload()); + Map> partSpecs; + partSpecs = + genPartSpecs(new Table(dropPartitionMessage.getTableObj()), + dropPartitionMessage.getPartitions()); + if (partSpecs.size() > 0) { + DropTableDesc dropPtnDesc = + new DropTableDesc(dbName + "." + + (tblName == null ? dropPartitionMessage.getTable() : tblName), partSpecs, null, + true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); + Task dropPtnTask = + TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf); + if (precursor != null) { + precursor.addDependentTask(dropPtnTask); + } + List> tasks = new ArrayList>(); + tasks.add(dropPtnTask); + LOG.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(), + dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions()); + return tasks; + } else { + throw new SemanticException( + "DROP PARTITION EVENT does not return any part descs for event message :" + + dmd.getPayload()); + } + } catch (Exception e) { + if (!(e instanceof SemanticException)){ + throw new SemanticException("Error reading message members", e); + } else { + throw (SemanticException)e; } - List> tasks = new ArrayList>(); - tasks.add(dropPtnTask); - LOG.debug("Added drop ptn task : {}:{},{}", - dropPtnTask.getId(), dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions()); - return tasks; - } else { - throw new SemanticException("DROP PARTITION EVENT does not return any part descs for event message :"+dmd.getPayload()); } } case EVENT_ALTER_TABLE: { @@ -1007,38 +1023,37 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { return null; } - private Map> genPartSpecs(List> partitions) throws SemanticException { - Map> partSpecs = new HashMap>(); - + private Map> genPartSpecs(Table table, + List> partitions) throws SemanticException { + Map> partSpecs = + new HashMap>(); int partPrefixLength = 0; if ((partitions != null) && (partitions.size() > 0)) { partPrefixLength = partitions.get(0).size(); - // pick the length of the first ptn, we expect all ptns listed to have the same number of key-vals. + // pick the length of the first ptn, we expect all ptns listed to have the same number of + // key-vals. } List ptnDescs = new ArrayList(); - for (Map ptn : partitions) { + for (Map ptn : partitions) { // convert each key-value-map to appropriate expression. - ExprNodeGenericFuncDesc expr = null; - for (Map.Entry kvp : ptn.entrySet()) { + for (Map.Entry kvp : ptn.entrySet()) { String key = kvp.getKey(); Object val = kvp.getValue(); - // FIXME : bug here, value is being placed as a String, but should actually be the underlying type - // as converted to it by looking at the table's col schema. To do that, however, we need the - // tableObjJson from the DropTableMessage. So, currently, this will only work for partitions for - // which the partition keys are all strings. So, for now, we hardcode it, but we need to fix this. - String type = "string"; - + String type = table.getPartColByName(key).getType(); + ; PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); - ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate("=", column, new ExprNodeConstantDesc(pti, val)); + ExprNodeGenericFuncDesc op = + DDLSemanticAnalyzer + .makeBinaryPredicate("=", column, new ExprNodeConstantDesc(pti, val)); expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); } if (expr != null) { ptnDescs.add(expr); } } - if (ptnDescs.size() > 0){ + if (ptnDescs.size() > 0) { partSpecs.put(partPrefixLength, ptnDescs); } return partSpecs;