diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 9e6af8f..39356ae 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -53,6 +53,8 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory; import org.apache.hadoop.hive.ql.Driver; @@ -295,13 +297,9 @@ public void alterTable() throws Exception { assertEquals(defaultDbName, event.getDbName()); assertEquals(tblName, event.getTableName()); - // Parse the message field - ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); - assertEquals(EventType.ALTER_TABLE.toString(), jsonTree.get("eventType").asText()); - assertEquals(defaultDbName, jsonTree.get("db").asText()); - assertEquals(tblName, jsonTree.get("table").asText()); - Table tableObj = JSONMessageFactory.getTableObj(jsonTree); - assertEquals(table, tableObj); + AlterTableMessage alterTableMessage = + JSONMessageFactory.getInstance().getDeserializer().getAlterTableMessage(event.getMessage()); + assertEquals(table, alterTableMessage.getTableObjAfter()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification @@ -485,8 +483,10 @@ public void alterPartition() throws Exception { assertEquals(EventType.ALTER_PARTITION.toString(), jsonTree.get("eventType").asText()); assertEquals(defaultDbName, jsonTree.get("db").asText()); assertEquals(tblName, jsonTree.get("table").asText()); - List partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree); - assertEquals(newPart, partitionObjList.get(0)); + + AlterPartitionMessage alterPartitionMessage = + JSONMessageFactory.getInstance().getDeserializer().getAlterPartitionMessage(event.getMessage()); + assertEquals(newPart, alterPartitionMessage.getPtnObjAfter()); // When hive.metastore.transactional.event.listeners is set, // a failed event should not create a new notification diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index d2696be..e29aa22 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; @@ -24,6 +25,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -42,8 +44,10 @@ import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import static junit.framework.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -403,6 +407,192 @@ public void testDrops() throws IOException { } + + @Test + public void testAlters() throws IOException { + + String testName = "alters"; + LOG.info("Testing "+testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); + + String[] unptn_data = new String[]{ "eleven" , "twelve" }; + String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; + String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; + String[] empty = new String[]{}; + + String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + + createTestDataFile(unptn_locn, unptn_data); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); + run("SELECT * from " + dbName + ".unptned"); + verifyResults(unptn_data); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned2"); + run("SELECT * from " + dbName + ".unptned2"); + verifyResults(unptn_data); + + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')"); + run("SELECT a from " + dbName + ".ptned WHERE b='1'"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')"); + run("SELECT a from " + dbName + ".ptned WHERE b='2'"); + verifyResults(ptn_data_2); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')"); + run("SELECT a from " + dbName + ".ptned2 WHERE b='1'"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')"); + run("SELECT a from " + dbName + ".ptned2 WHERE b='2'"); + verifyResults(ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0,0); + String replDumpId = getResult(0,1,true); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + run("REPL STATUS " + dbName + "_dupe"); + verifyResults(new String[] {replDumpId}); + + run("SELECT * from " + dbName + "_dupe.unptned"); + verifyResults(unptn_data); + run("SELECT * from " + dbName + "_dupe.unptned2"); + verifyResults(unptn_data); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'"); + verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'"); + verifyResults(ptn_data_2); + run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'"); + verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'"); + verifyResults(ptn_data_2); + + run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_rn"); + run("SELECT * from " + dbName + ".unptned_rn"); + verifyResults(unptn_data); + + String testKey = "blah"; + String testVal = "foo"; + run("ALTER TABLE " + dbName + ".unptned2 SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')"); + try { + Table unptn2 = metaStoreClient.getTable(dbName,"unptned2"); + assertTrue(unptn2.getParameters().containsKey(testKey)); + assertEquals(testVal,unptn2.getParameters().get(testKey)); + } catch (TException e) { + assertNull(e); + } + + run("ALTER TABLE " + dbName + ".ptned PARTITION (b='2') RENAME TO PARTITION (b='22')"); + run("SELECT a from " + dbName + ".ptned WHERE b=2"); + verifyResults(empty); + run("SELECT a from " + dbName + ".ptned WHERE b=22"); + verifyResults(ptn_data_2); + + run("ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')"); + try { + Table ptned = metaStoreClient.getTable(dbName,"ptned"); + assertTrue(ptned.getParameters().containsKey(testKey)); + assertEquals(testVal,ptned.getParameters().get(testKey)); + } catch (TException e) { + assertNull(e); + } + + // No DDL way to alter a partition, so we use the MSC api directly. + try { + List ptnVals1 = new ArrayList(); + ptnVals1.add("1"); + Partition ptn1 = metaStoreClient.getPartition(dbName, "ptned", ptnVals1); + ptn1.getParameters().put(testKey,testVal); + metaStoreClient.alter_partition(dbName,"ptned",ptn1,null); + } catch (TException e) { + assertNull(e); + } + + + run("SELECT a from " + dbName + ".ptned2 WHERE b=2"); + verifyResults(ptn_data_2); + run("ALTER TABLE " + dbName + ".ptned2 RENAME TO " + dbName + ".ptned2_rn"); + run("SELECT a from " + dbName + ".ptned2_rn WHERE b=2"); + verifyResults(ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String postAlterReplDumpLocn = getResult(0,0); + String postAlterReplDumpId = getResult(0,1,true); + LOG.info("Dumped to {} with id {}->{}", postAlterReplDumpLocn, replDumpId, postAlterReplDumpId); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'"); + + Exception e = null; + try { + Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "unptned"); + assertNull(tbl); + } catch (TException te) { + e = te; + } + assertNotNull(e); + assertEquals(NoSuchObjectException.class, e.getClass()); + + try { + Table unptn2 = metaStoreClient.getTable(dbName + "_dupe" , "unptned2"); + assertTrue(unptn2.getParameters().containsKey(testKey)); + assertEquals(testVal,unptn2.getParameters().get(testKey)); + } catch (TException te) { + assertNull(te); + } + + run("SELECT * from " + dbName + "_dupe.unptned_rn"); + verifyResults(unptn_data); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); + verifyResults(empty); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=22"); + verifyResults(ptn_data_2); + + Exception e2 = null; + try { + Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "ptned2"); + assertNull(tbl); + } catch (TException te) { + e2 = te; + } + assertNotNull(e2); + assertEquals(NoSuchObjectException.class, e.getClass()); + + try { + Table ptned = metaStoreClient.getTable(dbName + "_dupe" , "ptned"); + assertTrue(ptned.getParameters().containsKey(testKey)); + assertEquals(testVal,ptned.getParameters().get(testKey)); + } catch (TException te) { + assertNull(te); + } + + try { + List ptnVals1 = new ArrayList(); + ptnVals1.add("1"); + Partition ptn1 = metaStoreClient.getPartition(dbName + "_dupe", "ptned", ptnVals1); + assertTrue(ptn1.getParameters().containsKey(testKey)); + assertEquals(testVal,ptn1.getParameters().get(testKey)); + } catch (TException te) { + assertNull(te); + } + + run("SELECT a from " + dbName + "_dupe.ptned2_rn WHERE b=2"); + verifyResults(ptn_data_2); + } + + private String getResult(int rowNum, int colNum) throws IOException { return getResult(rowNum,colNum,false); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java index 26898f2..99c1a93 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; + import java.util.List; import java.util.Map; @@ -34,12 +37,16 @@ protected AddPartitionMessage() { */ public abstract String getTable(); + public abstract Table getTableObj() throws Exception; + /** * Getter for list of partitions added. * @return List of maps, where each map identifies values for each partition-key, for every added partition. */ public abstract List> getPartitions (); + public abstract Iterable getPartitionObjs() throws Exception; + @Override public EventMessage checkValid() { if (getTable() == null) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java index d89dba1..ed6080b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java @@ -18,6 +18,9 @@ */ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; + import java.util.Map; public abstract class AlterPartitionMessage extends EventMessage { @@ -30,10 +33,32 @@ protected AlterPartitionMessage() { public abstract Map getKeyValues(); + public abstract Table getTableObj() throws Exception; + + public abstract Partition getPtnObjBefore() throws Exception; + + public abstract Partition getPtnObjAfter() throws Exception; @Override public EventMessage checkValid() { if (getTable() == null) throw new IllegalStateException("Table name unset."); if (getKeyValues() == null) throw new IllegalStateException("Partition values unset"); + try { + if (getTableObj() == null){ + throw new IllegalStateException("Table object not set."); + } + if (getPtnObjAfter() == null){ + throw new IllegalStateException("Partition object(after) not set."); + } + if (getPtnObjBefore() == null){ + throw new IllegalStateException("Partition object(before) not set."); + } + } catch (Exception e) { + if (! (e instanceof IllegalStateException)){ + throw new IllegalStateException("Event not set up correctly",e); + } else { + throw (IllegalStateException) e; + } + } return super.checkValid(); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java index 99e678a..5487123 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Table; + public abstract class AlterTableMessage extends EventMessage { protected AlterTableMessage() { @@ -26,9 +28,27 @@ protected AlterTableMessage() { public abstract String getTable(); + public abstract Table getTableObjBefore() throws Exception; + + public abstract Table getTableObjAfter() throws Exception; + @Override public EventMessage checkValid() { if (getTable() == null) throw new IllegalStateException("Table name unset."); + try { + if (getTableObjAfter() == null){ + throw new IllegalStateException("Table object(after) not set."); + } + if (getTableObjBefore() == null){ + throw new IllegalStateException("Table object(before) not set."); + } + } catch (Exception e) { + if (! (e instanceof IllegalStateException)){ + throw new IllegalStateException("Event not set up correctly",e); + } else { + throw (IllegalStateException) e; + } + } return super.checkValid(); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java index c88c59c..e01aa64 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Table; + public abstract class CreateTableMessage extends EventMessage { protected CreateTableMessage() { @@ -26,11 +28,13 @@ protected CreateTableMessage() { } /** - * Getter for the name of table created in HCatalog. + * Getter for the name of table created * @return Table-name (String). */ public abstract String getTable(); + public abstract Table getTableObj() throws Exception; + @Override public EventMessage checkValid() { if (getTable() == null) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java index 972c5c2..94c0173 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPartitionMessage.java @@ -19,12 +19,16 @@ package org.apache.hadoop.hive.metastore.messaging.json; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -53,17 +57,6 @@ public JSONAddPartitionMessage() { } - public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table, - List> partitions, Long timestamp) { - this.server = server; - this.servicePrincipal = servicePrincipal; - this.db = db; - this.table = table; - this.partitions = partitions; - this.timestamp = timestamp; - checkValid(); - } - /** * Note that we get an Iterator rather than an Iterable here: so we can only walk thru the list once */ @@ -111,6 +104,11 @@ public String getTable() { } @Override + public Table getTableObj() throws Exception { + return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class); + } + + @Override public Long getTimestamp() { return timestamp; } @@ -120,6 +118,20 @@ public Long getTimestamp() { return partitions; } + @Override + public Iterable getPartitionObjs() throws Exception { + // glorified cast from Iterable to Iterable + return Iterables.transform( + JSONMessageFactory.getTObjs(partitionListJson,Partition.class), + new Function() { + @Nullable + @Override + public Partition apply(@Nullable Object input) { + return (Partition) input; + } + }); + } + public String getTableObjJson() { return tableObjJson; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java index b62fda8..dd1bf3c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java @@ -43,7 +43,7 @@ Map keyValues; @JsonProperty - List partitionListJson; + String partitionObjBeforeJson, partitionObjAfterJson; /** * Default constructor, needed for Jackson. @@ -51,28 +51,22 @@ public JSONAlterPartitionMessage() { } - public JSONAlterPartitionMessage(String server, String servicePrincipal, String db, String table, - Map keyValues, Long timestamp) { + public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj, + Partition partitionObjBefore, Partition partitionObjAfter, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; - this.db = db; - this.table = table; + this.db = tableObj.getDbName(); + this.table = tableObj.getTableName(); this.timestamp = timestamp; - this.keyValues = keyValues; - checkValid(); - } - - public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj, - Partition partitionObjBefore, Partition partitionObjAfter, Long timestamp) { - this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), - JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore), timestamp); + this.keyValues = JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore); try { this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); - partitionListJson = new ArrayList(); - partitionListJson.add(JSONMessageFactory.createPartitionObjJson(partitionObjAfter)); + this.partitionObjBeforeJson = JSONMessageFactory.createPartitionObjJson(partitionObjBefore); + this.partitionObjAfterJson = JSONMessageFactory.createPartitionObjJson(partitionObjAfter); } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } + checkValid(); } @Override @@ -105,12 +99,31 @@ public String getTable() { return keyValues; } + @Override + public Table getTableObj() throws Exception { + return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class); + } + + @Override + public Partition getPtnObjBefore() throws Exception { + return (Partition) JSONMessageFactory.getTObj(partitionObjBeforeJson, Partition.class); + } + + @Override + public Partition getPtnObjAfter() throws Exception { + return (Partition) JSONMessageFactory.getTObj(partitionObjAfterJson, Partition.class); + } + public String getTableObjJson() { return tableObjJson; } - public List getPartitionListJson() { - return partitionListJson; + public String getPartitionObjBeforeJson() { + return partitionObjBeforeJson; + } + + public String getPartitionObjAfterJson() { + return partitionObjAfterJson; } @Override diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java index c6e20ce..792015e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java @@ -29,7 +29,7 @@ public class JSONAlterTableMessage extends AlterTableMessage { @JsonProperty - String server, servicePrincipal, db, table, tableObjJson; + String server, servicePrincipal, db, table, tableObjBeforeJson, tableObjAfterJson; @JsonProperty Long timestamp; @@ -40,24 +40,20 @@ public JSONAlterTableMessage() { } - public JSONAlterTableMessage(String server, String servicePrincipal, String db, String table, + public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObjBefore, Table tableObjAfter, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; - this.db = db; - this.table = table; + this.db = tableObjBefore.getDbName(); + this.table = tableObjBefore.getTableName(); this.timestamp = timestamp; - checkValid(); - } - - public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObj, - Long timestamp) { - this(server, servicePrincipal, tableObj.getDbName(), tableObj.getTableName(), timestamp); try { - this.tableObjJson = JSONMessageFactory.createTableObjJson(tableObj); + this.tableObjBeforeJson = JSONMessageFactory.createTableObjJson(tableObjBefore); + this.tableObjAfterJson = JSONMessageFactory.createTableObjJson(tableObjAfter); } catch (TException e) { throw new IllegalArgumentException("Could not serialize: ", e); } + checkValid(); } @Override @@ -85,8 +81,22 @@ public String getTable() { return table; } - public String getTableObjJson() { - return tableObjJson; + @Override + public Table getTableObjBefore() throws Exception { + return (Table) JSONMessageFactory.getTObj(tableObjBeforeJson,Table.class); + } + + @Override + public Table getTableObjAfter() throws Exception { + return (Table) JSONMessageFactory.getTObj(tableObjAfterJson,Table.class); + } + + public String getTableObjBeforeJson() { + return tableObjBeforeJson; + } + + public String getTableObjAfterJson() { + return tableObjAfterJson ; } @Override diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java index aa737ca..4c23625 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateTableMessage.java @@ -85,6 +85,11 @@ public String getTable() { return table; } + @Override + public Table getTableObj() throws Exception { + return (Table) JSONMessageFactory.getTObj(tableObjJson,Table.class); + } + public String getTableObjJson() { return tableObjJson; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 17e7686..9954902 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; +import com.google.common.collect.Iterables; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.Index; @@ -51,6 +52,7 @@ import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; @@ -76,6 +78,7 @@ private static final Logger LOG = LoggerFactory.getLogger(JSONMessageFactory.class.getName()); private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer(); + private static TDeserializer thriftDeSerializer = new TDeserializer(new TJSONProtocol.Factory()); @Override public MessageDeserializer getDeserializer() { @@ -109,7 +112,7 @@ public CreateTableMessage buildCreateTableMessage(Table table) { @Override public AlterTableMessage buildAlterTableMessage(Table before, Table after) { - return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, after, now()); + return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, now()); } @Override @@ -238,6 +241,81 @@ public static Table getTableObj(ObjectNode jsonTree) throws Exception { return tableObj; } + /* + * TODO: Some thoughts here : We have a current todo to move some of these methods over to + * MessageFactory instead of being here, so we can override them, but before we move them over, + * we should keep the following in mind: + * + * a) We should return Iterables, not Lists. That makes sure that we can be memory-safe when + * implementing it rather than forcing ourselves down a path wherein returning List is part of + * our interface, and then people use .size() or somesuch which makes us need to materialize + * the entire list and not change. Also, returning Iterables allows us to do things like + * Iterables.transform for some of these. + * b) We should not have "magic" names like "tableObjJson", because that breaks expectation of a + * couple of things - firstly, that of serialization format, although that is fine for this + * JSONMessageFactory, and secondly, that makes us just have a number of mappings, one for each + * obj type, and sometimes, as the case is with alter, have multiples. Also, any event-specific + * item belongs in that event message / event itself, as opposed to in the factory. It's okay to + * have utility accessor methods here that are used by each of the messages to provide accessors. + * I'm adding a couple of those here. + * + */ + + public static TBase getTObj(String tSerialized, Class objClass) throws Exception{ + TBase obj = objClass.newInstance(); + thriftDeSerializer.deserialize(obj, tSerialized, "UTF-8"); + return obj; + } + + + public static Iterable getTObjs( + Iterable objRefStrs, final Class objClass) throws Exception { + + try { + return Iterables.transform(objRefStrs, new com.google.common.base.Function(){ + @Override + public TBase apply(@Nullable String objStr){ + try { + return getTObj(objStr, objClass); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException re){ + // We have to add this bit of exception handling here, because Function.apply does not allow us to throw + // the actual exception that might be a checked exception, so we wind up needing to throw a RuntimeException + // with the previously thrown exception as its cause. However, since RuntimeException.getCause() returns + // a throwable instead of an Exception, we have to account for the possibility that the underlying code + // might have thrown a Throwable that we wrapped instead, in which case, continuing to throw the + // RuntimeException is the best thing we can do. + Throwable t = re.getCause(); + if (t instanceof Exception){ + throw (Exception) t; + } else { + throw re; + } + } + } + + // If we do not need this format of accessor using ObjectNode, this is a candidate for removal as well + public static Iterable getTObjs( + ObjectNode jsonTree, String objRefListName, final Class objClass) throws Exception { + Iterable jsonArrayIterator = jsonTree.get(objRefListName); + com.google.common.base.Function textExtractor = + new com.google.common.base.Function() { + @Nullable + @Override + public String apply(@Nullable JsonNode input) { + return input.asText(); + } + }; + return getTObjs(Iterables.transform(jsonArrayIterator, textExtractor), objClass); + } + + // FIXME : remove all methods below this, and expose them from the individual Messages' impl instead. + // TestDbNotificationListener needs a revamp before we remove these methods though. + public static List getPartitionObjList(ObjectNode jsonTree) throws Exception { TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); List partitionObjList = new ArrayList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 69ccda7..9b83407 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.base.Function; -import com.google.common.collect.Lists; +import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; @@ -29,15 +29,17 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; -import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -46,15 +48,16 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.IOUtils; @@ -71,6 +74,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -100,6 +104,10 @@ EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"), EVENT_DROP_TABLE("EVENT_DROP_TABLE"), EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"), + EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"), + EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"), + EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), + EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), EVENT_UNKNOWN("EVENT_UNKNOWN"); String type = null; @@ -134,7 +142,7 @@ public DumpMetaData(Path dumpRoot) { public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo){ this(dumpRoot); - setDump(lvl,eventFrom,eventTo); + setDump(lvl, eventFrom, eventTo); } public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo){ @@ -151,8 +159,8 @@ public void loadDumpFromFile() throws SemanticException { BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); String line = null; if ( (line = br.readLine()) != null){ - String[] lineContents = line.split("\t",4); - setDump(DUMPTYPE.valueOf(lineContents[0]),Long.valueOf(lineContents[1]),Long.valueOf(lineContents[2])); + String[] lineContents = line.split("\t", 4); + setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2])); setPayload(lineContents[3].equals(Utilities.nullStringOutput) ? null : lineContents[3]); } else { throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString()); @@ -301,7 +309,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId(); - LOG.info("Bootstrap object dump phase took from {} to {}",bootDumpBeginReplId, bootDumpEndReplId); + LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId); // Now that bootstrap has dumped all objects related, we have to account for the changes // that occurred while bootstrap was happening - i.e. we have to look through all events @@ -346,7 +354,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter( - EventUtils.getDbTblNotificationFilter(dbNameOrPattern,tblNameOrPattern), + EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern), EventUtils.getEventBoundaryFilter(eventFrom, eventTo)); EventUtils.MSClientNotificationFetcher evFetcher @@ -358,7 +366,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { while (evIter.hasNext()){ NotificationEvent ev = evIter.next(); Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); - dumpEvent(ev,evRoot); + dumpEvent(ev, evRoot); } LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo); @@ -372,7 +380,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { setFetchTask(createFetchTask(dumpSchema)); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes - LOG.warn("Error during analyzeReplDump",e); + LOG.warn("Error during analyzeReplDump", e); throw new SemanticException(e); } } @@ -384,13 +392,10 @@ private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); switch (ev.getEventType()){ case MessageFactory.CREATE_TABLE_EVENT : { - LOG.info("Processing#{} CREATE_TABLE message : {}",ev.getEventId(),ev.getMessage()); + CreateTableMessage ctm = md.getCreateTableMessage(ev.getMessage()); + LOG.info("Processing#{} CREATE_TABLE message : {}", ev.getEventId(), ev.getMessage()); + org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj(); - // FIXME : Current MessageFactory api is lacking, - // and impl is in JSONMessageFactory instead. This needs to be - // refactored correctly so we don't depend on a specific impl. - org.apache.hadoop.hive.metastore.api.Table tobj = - JSONMessageFactory.getTableObj(JSONMessageFactory.getJsonTree(ev)); if (tobj == null){ LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed"); break; @@ -408,41 +413,40 @@ private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { // FIXME : dump _files should happen at dbnotif time, doing it here is incorrect // we will, however, do so here, now, for dev/debug's sake. - Path dataPath = new Path(evRoot,"data"); + Path dataPath = new Path(evRoot, "data"); rootTasks.add(ReplCopyTask.getDumpCopyTask(replicationSpec, qlMdTable.getPath(), dataPath , conf)); - (new DumpMetaData(evRoot,DUMPTYPE.EVENT_CREATE_TABLE,evid,evid)).write(); + (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid)).write(); break; } case MessageFactory.ADD_PARTITION_EVENT : { - LOG.info("Processing#{} ADD_PARTITION message : {}",ev.getEventId(),ev.getMessage()); + AddPartitionMessage apm = md.getAddPartitionMessage(ev.getMessage()); + LOG.info("Processing#{} ADD_PARTITION message : {}", ev.getEventId(), ev.getMessage()); // FIXME : Current MessageFactory api is lacking, // and impl is in JSONMessageFactory instead. This needs to be // refactored correctly so we don't depend on a specific impl. - List ptnObjs = - JSONMessageFactory.getPartitionObjList(JSONMessageFactory.getJsonTree(ev)); - if ((ptnObjs == null) || (ptnObjs.size() == 0)) { + Iterable ptns = apm.getPartitionObjs(); + if ((ptns == null) || (!ptns.iterator().hasNext())) { LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions"); break; } - org.apache.hadoop.hive.metastore.api.Table tobj = - JSONMessageFactory.getTableObj(JSONMessageFactory.getJsonTree(ev)); + org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj(); if (tobj == null){ LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed"); break; } final Table qlMdTable = new Table(tobj); - List qlPtns = Lists.transform( - ptnObjs, + Iterable qlPtns = Iterables.transform( + ptns, new Function() { @Nullable @Override public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) { - if (input == null){ + if (input == null) { return null; } try { - return new Partition(qlMdTable,input); + return new Partition(qlMdTable, input); } catch (HiveException e) { throw new IllegalArgumentException(e); } @@ -461,32 +465,106 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition // FIXME : dump _files should ideally happen at dbnotif time, doing it here introduces // rubberbanding. But, till we have support for that, this is our closest equivalent for (Partition qlPtn : qlPtns){ - Path ptnDataPath = new Path(evRoot,qlPtn.getName()); + Path ptnDataPath = new Path(evRoot, qlPtn.getName()); rootTasks.add(ReplCopyTask.getDumpCopyTask( replicationSpec, qlPtn.getPartitionPath(), ptnDataPath, conf)); } - (new DumpMetaData(evRoot,DUMPTYPE.EVENT_ADD_PARTITION,evid,evid)).write(); + (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid)).write(); break; } case MessageFactory.DROP_TABLE_EVENT : { - LOG.info("Processing#{} DROP_TABLE message : {}",ev.getEventId(),ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_DROP_TABLE,evid,evid); + LOG.info("Processing#{} DROP_TABLE message : {}", ev.getEventId(), ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid); dmd.setPayload(ev.getMessage()); dmd.write(); break; } case MessageFactory.DROP_PARTITION_EVENT : { - LOG.info("Processing#{} DROP_PARTITION message : {}",ev.getEventId(),ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_DROP_PARTITION,evid,evid); + LOG.info("Processing#{} DROP_PARTITION message : {}", ev.getEventId(), ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid, evid); dmd.setPayload(ev.getMessage()); dmd.write(); break; } + case MessageFactory.ALTER_TABLE_EVENT : { + LOG.info("Processing#{} ALTER_TABLE message : {}", ev.getEventId(), ev.getMessage()); + AlterTableMessage atm = md.getAlterTableMessage(ev.getMessage()); + org.apache.hadoop.hive.metastore.api.Table tobjBefore = atm.getTableObjBefore(); + org.apache.hadoop.hive.metastore.api.Table tobjAfter = atm.getTableObjAfter(); + + if (tobjBefore.getDbName().equals(tobjAfter.getDbName()) && + tobjBefore.getTableName().equals(tobjAfter.getTableName())){ + // regular alter scenario + replicationSpec.setIsMetadataOnly(true); + Table qlMdTableAfter = new Table(tobjAfter); + Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(conf), + metaDataPath, + qlMdTableAfter, + null, + replicationSpec); + + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); + } else { + // rename scenario + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid, evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); + } + + break; + } + case MessageFactory.ALTER_PARTITION_EVENT : { + LOG.info("Processing#{} ALTER_PARTITION message : {}", ev.getEventId(), ev.getMessage()); + AlterPartitionMessage apm = md.getAlterPartitionMessage(ev.getMessage()); + org.apache.hadoop.hive.metastore.api.Table tblObj = apm.getTableObj(); + org.apache.hadoop.hive.metastore.api.Partition pobjBefore = apm.getPtnObjBefore(); + org.apache.hadoop.hive.metastore.api.Partition pobjAfter = apm.getPtnObjAfter(); + + boolean renameScenario = false; + Iterator beforeValIter = pobjBefore.getValuesIterator(); + Iterator afterValIter = pobjAfter.getValuesIterator(); + for ( ; beforeValIter.hasNext() ; ){ + if (!beforeValIter.next().equals(afterValIter.next())){ + renameScenario = true; + break; + } + } + + if (!renameScenario){ + // regular partition alter + replicationSpec.setIsMetadataOnly(true); + Table qlMdTable = new Table(tblObj); + List qlPtns = new ArrayList(); + qlPtns.add(new Partition(qlMdTable, pobjAfter)); + Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(conf), + metaDataPath, + qlMdTable, + qlPtns, + replicationSpec); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); + break; + } else { + // rename scenario + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); + break; + } + + } // TODO : handle other event types default: - LOG.info("Dummy processing#{} message : {}",ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot,DUMPTYPE.EVENT_UNKNOWN,evid,evid); + LOG.info("Dummy processing#{} message : {}", ev.getEventId(), ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid); dmd.setPayload(ev.getMessage()); dmd.write(); break; @@ -666,7 +744,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath); if (srcs == null || (srcs.length == 0)) { - LOG.warn("Nothing to load at {}",loadPath.toUri().toString()); + LOG.warn("Nothing to load at {}", loadPath.toUri().toString()); return; } @@ -729,12 +807,16 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); } LOG.debug("Updated taskChainTail from {}{} to {}{}", - taskChainTail.getClass(),taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); + taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); taskChainTail = barrierTask; evstage++; } } - LOG.debug("added evTaskRoot {}:{}",evTaskRoot.getClass(),evTaskRoot.getId()); + // TODO : Over here, we need to track a Map for every db updated + // and update repl.last.id for each, if this is a wh-level load, and if it is a db-level load, + // then a single repl.last.id update, and if this is a tbl-lvl load which does not alter the + // table itself, we'll need to update repl.last.id for that as well. + LOG.debug("added evTaskRoot {}:{}", evTaskRoot.getClass(), evTaskRoot.getId()); rootTasks.add(evTaskRoot); } @@ -751,6 +833,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // Currently handles only create-tbl & insert-ptn, since only those are dumped // As we add more event types, this will expand. DumpMetaData dmd = new DumpMetaData(new Path(locn)); + MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); switch (dmd.getDumpType()) { case EVENT_CREATE_TABLE: { return analyzeTableLoad(dbName, tblName, locn, precursor); @@ -759,7 +842,6 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { return analyzeTableLoad(dbName, tblName, locn, precursor); } case EVENT_DROP_TABLE: { - MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); DropTableMessage dropTableMessage = md.getDropTableMessage(dmd.getPayload()); DropTableDesc dropTableDesc = new DropTableDesc( dbName + "." + (tblName == null ? dropTableMessage.getTable() : tblName), @@ -774,7 +856,6 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { return tasks; } case EVENT_DROP_PARTITION: { - MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload()); Map> partSpecs = genPartSpecs(dropPartitionMessage.getPartitions()); if (partSpecs.size() > 0){ @@ -794,6 +875,88 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { throw new SemanticException("DROP PARTITION EVENT does not return any part descs for event message :"+dmd.getPayload()); } } + case EVENT_ALTER_TABLE: { + return analyzeTableLoad(dbName, tblName, locn, precursor); + } + case EVENT_RENAME_TABLE: { + AlterTableMessage renameTableMessage = md.getAlterTableMessage(dmd.getPayload()); + if ((tblName != null) && (!tblName.isEmpty())){ + throw new SemanticException("RENAMES of tables are not supported for table-level replication"); + } + try { + String oldDbName = renameTableMessage.getTableObjBefore().getDbName(); + String newDbName = renameTableMessage.getTableObjAfter().getDbName(); + + if ((dbName != null) && (!dbName.isEmpty())){ + // If we're loading into a db, instead of into the warehouse, then the oldDbName and + // newDbName must be the same + if (!oldDbName.equalsIgnoreCase(newDbName)){ + throw new SemanticException("Cannot replicate an event renaming a table across" + + " databases into a db level load " + oldDbName +"->" + newDbName); + } else { + // both were the same, and can be replaced by the new db we're loading into. + oldDbName = dbName; + newDbName = dbName; + } + } + + String oldName = oldDbName + "." + renameTableMessage.getTableObjBefore().getTableName(); + String newName = newDbName + "." + renameTableMessage.getTableObjAfter().getTableName(); + AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false); + Task renameTableTask = TaskFactory.get(new DDLWork(inputs, outputs, renameTableDesc), conf); + if (precursor != null){ + precursor.addDependentTask(renameTableTask); + } + List> tasks = new ArrayList>(); + tasks.add(renameTableTask); + LOG.debug("Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName); + return tasks; + } catch (Exception e) { + if (!(e instanceof SemanticException)){ + throw new SemanticException("Error reading message members", e); + } else { + throw (SemanticException)e; + } + } + } + case EVENT_ALTER_PARTITION: { + return analyzeTableLoad(dbName, tblName, locn, precursor); + } + case EVENT_RENAME_PARTITION: { + AlterPartitionMessage renamePtnMessage = md.getAlterPartitionMessage(dmd.getPayload()); + + Map newPartSpec = new LinkedHashMap(); + Map oldPartSpec = new LinkedHashMap(); + String tableName = dbName + "." + + ((tblName == null || tblName.isEmpty()) ? renamePtnMessage.getTable() : tblName); + try { + org.apache.hadoop.hive.metastore.api.Table tblObj = renamePtnMessage.getTableObj(); + org.apache.hadoop.hive.metastore.api.Partition pobjBefore = renamePtnMessage.getPtnObjBefore(); + org.apache.hadoop.hive.metastore.api.Partition pobjAfter = renamePtnMessage.getPtnObjAfter(); + Iterator beforeValIter = pobjBefore.getValuesIterator(); + Iterator afterValIter = pobjAfter.getValuesIterator(); + for (FieldSchema fs : tblObj.getPartitionKeys()){ + oldPartSpec.put(fs.getName(), beforeValIter.next()); + newPartSpec.put(fs.getName(), afterValIter.next()); + } + } catch (Exception e) { + if (!(e instanceof SemanticException)){ + throw new SemanticException("Error reading message members", e); + } else { + throw (SemanticException)e; + } + } + + RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec); + Task renamePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, renamePtnDesc), conf); + if (precursor != null){ + precursor.addDependentTask(renamePtnTask); + } + List> tasks = new ArrayList>(); + tasks.add(renamePtnTask); + LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); + return tasks; + } case EVENT_UNKNOWN: { break; } @@ -836,7 +999,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } } if (ptnDescs.size() > 0){ - partSpecs.put(partPrefixLength,ptnDescs); + partSpecs.put(partPrefixLength, ptnDescs); } return partSpecs; } @@ -992,7 +1155,7 @@ private void prepareReturnValues(List values, String schema) throws Sema LOG.debug(" > " + s); } ctx.setResFile(ctx.getLocalTmpPath()); - writeOutput(values,ctx.getResFile()); + writeOutput(values, ctx.getResFile()); } private void writeOutput(List values, Path outputFile) throws SemanticException { @@ -1016,7 +1179,7 @@ private void writeOutput(List values, Path outputFile) throws SemanticEx private ReplicationSpec getNewReplicationSpec() throws SemanticException { try { - ReplicationSpec rspec = getNewReplicationSpec("replv2","will-be-set"); + ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set"); rspec.setCurrentReplicationState(String.valueOf(db.getMSC() .getCurrentNotificationEventId().getEventId())); return rspec; @@ -1032,7 +1195,7 @@ private ReplicationSpec getNewReplicationSpec(String evState, String objState) t // Use for replication states focussed on event only, where the obj state will be the event state private ReplicationSpec getNewEventOnlyReplicationSpec(String evState) throws SemanticException { - return getNewReplicationSpec(evState,evState); + return getNewReplicationSpec(evState, evState); } private Iterable matchesTbl(String dbName, String tblPattern) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 060f2a1..402d96f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -258,6 +258,10 @@ public boolean isMetadataOnly(){ return isMetadataOnly; } + public void setIsMetadataOnly(boolean isMetadataOnly){ + this.isMetadataOnly = isMetadataOnly; + } + /** * @return the replication state of the event that spawned this statement */