commit eb5ec12fe8b4184d1e4fbe0b40f5ffad462e3da1 Author: Daniel Dai Date: Thu Jan 26 14:49:01 2017 -0800 HIVE-15705: Event replication for constraints diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 4df2758..8e8c7a9 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -39,9 +39,13 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AddPrimaryKeyEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; @@ -49,6 +53,7 @@ import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropConstraintEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; import org.apache.hadoop.hive.metastore.events.DropIndexEvent; @@ -461,6 +466,57 @@ public void onLoadPartitionDone(LoadPartitionDoneEvent partSetDoneEvent) throws // then load data into it. } + /*** + * @param addPrimaryKeyEvent add primary key event + * @throws MetaException + */ + @Override + public void onAddPrimaryKey(AddPrimaryKeyEvent addPrimaryKeyEvent) throws MetaException { + List cols = addPrimaryKeyEvent.getPrimaryKeyCols(); + if (cols.size() > 0) { + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ADD_PRIMARYKEY.toString(), msgFactory + .buildAddPrimaryKeyMessage(addPrimaryKeyEvent.getPrimaryKeyCols()).toString()); + event.setDbName(cols.get(0).getTable_db()); + event.setTableName(cols.get(0).getTable_name()); + process(event); + } + } + + /*** + * @param addForeignKeyEvent add foreign key event + * @throws MetaException + */ + @Override + public void onAddForeignKey(AddForeignKeyEvent addForeignKeyEvent) throws MetaException { + List cols = addForeignKeyEvent.getForeignKeyCols(); + if (cols.size() > 0) { + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ADD_FOREIGNKEY.toString(), msgFactory + .buildAddForeignKeyMessage(addForeignKeyEvent.getForeignKeyCols()).toString()); + event.setDbName(cols.get(0).getPktable_db()); + event.setTableName(cols.get(0).getPktable_name()); + process(event); + } + } + + /*** + * @param dropConstraintEvent drop constraint event + * @throws MetaException + */ + @Override + public void onDropConstraint(DropConstraintEvent dropConstraintEvent) throws MetaException { + String dbName = dropConstraintEvent.getDbName(); + String tableName = dropConstraintEvent.getTableName(); + String constraintName = dropConstraintEvent.getConstraintName(); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.DROP_CONSTRAINT.toString(), msgFactory + .buildDropConstraintMessage(dbName, tableName, constraintName).toString()); + event.setDbName(dbName); + event.setTableName(tableName); + process(event); + } + private int now() { long millis = System.currentTimeMillis(); millis /= 1000; diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 5282a5a..af326d6 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -892,9 +892,10 @@ public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { } @Override - public void createTableWithConstraints(Table tbl, + public List createTableWithConstraints(Table tbl, List primaryKeys, List foreignKeys) throws InvalidObjectException, MetaException { + return null; } @Override @@ -903,12 +904,14 @@ public void dropConstraint(String dbName, String tableName, } @Override - public void addPrimaryKeys(List pks) + public List addPrimaryKeys(List pks) throws InvalidObjectException, MetaException { + return null; } @Override - public void addForeignKeys(List fks) + public List addForeignKeys(List fks) throws InvalidObjectException, MetaException { + return null; } } \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 7836c47..c584b07 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -23,8 +23,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap; @@ -58,7 +62,7 @@ final static String DBNOTIF_LISTENER_CLASSNAME = "org.apache.hive.hcatalog.listener.DbNotificationListener"; // FIXME : replace with hive copy once that is copied final static String tid = - TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" + System.currentTimeMillis(); + TestReplicationScenarios.class.getCanonicalName().toLowerCase().replace('.','_') + "_" + System.currentTimeMillis(); final static String TEST_PATH = System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + tid; static HiveConf hconf; @@ -66,6 +70,10 @@ static int msPort; static Driver driver; static HiveMetaStoreClient metaStoreClient; + static HiveConf hconfMirror; + static int msPortMirror; + static Driver driverMirror; + static HiveMetaStoreClient metaStoreClientMirror; protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private ArrayList lastResults; @@ -90,11 +98,16 @@ public static void setUpBeforeClass() throws Exception { WindowsPathUtil.convertPathsFromWindowsToHdfs(hconf); } - hconf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, + HiveConf hconfServer = new HiveConf(); + hconfServer.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore - hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); + + hconfServer.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:;databaseName=metastore_db;create=true"); + hconfServer.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); + hconfServer.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/"); + msPort = MetaStoreUtils.startMetaStore(hconfServer); + hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/"); - msPort = MetaStoreUtils.startMetaStore(hconf); hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/"); hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort); @@ -113,6 +126,15 @@ public static void setUpBeforeClass() throws Exception { driver = new Driver(hconf); SessionState.start(new CliSessionState(hconf)); metaStoreClient = new HiveMetaStoreClient(hconf); + + HiveConf hconfMirrorServer = new HiveConf(); + hconfMirrorServer.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:;databaseName=metastore_db2;create=true"); + msPortMirror = MetaStoreUtils.startMetaStore(hconfMirrorServer); + hconfMirror = new HiveConf(hconf); + hconfMirror.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + + msPortMirror); + driverMirror = new Driver(hconfMirror); + metaStoreClientMirror = new HiveMetaStoreClient(hconfMirror); } @AfterClass @@ -149,11 +171,11 @@ public void testBasic() throws IOException { LOG.info("Testing "+testName); String dbName = testName + "_" + tid; - run("CREATE DATABASE " + dbName); - run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE"); + run("CREATE DATABASE " + dbName, driver); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); String[] unptn_data = new String[]{ "eleven" , "twelve" }; String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; @@ -168,30 +190,30 @@ public void testBasic() throws IOException { createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2); - verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); - verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driver); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); advanceDumpDir(); - run("REPL DUMP " + dbName); - String replDumpLocn = getResult(0,0); - String replDumpId = getResult(0,1,true); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - - verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId); - - verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2); - verifyRun("SELECT a from " + dbName + ".ptned_empty", empty); - verifyRun("SELECT * from " + dbName + ".unptned_empty", empty); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0,0,driver); + String replDumpId = getResult(0,1,true,driver); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + + verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId, driverMirror); + + verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + dbName + ".ptned_empty", empty, driver); + verifyRun("SELECT * from " + dbName + ".unptned_empty", empty, driver); } @Test @@ -201,11 +223,11 @@ public void testBasicWithCM() throws Exception { LOG.info("Testing "+testName); String dbName = testName + "_" + tid; - run("CREATE DATABASE " + dbName); - run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE"); + run("CREATE DATABASE " + dbName, driver); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); String[] unptn_data = new String[]{ "eleven" , "twelve" }; String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; @@ -223,29 +245,29 @@ public void testBasicWithCM() throws Exception { createTestDataFile(ptn_locn_2, ptn_data_2); createTestDataFile(ptn_locn_2_later, ptn_data_2_later); - run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - run("SELECT * from " + dbName + ".unptned"); - verifyResults(unptn_data); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)"); - run("SELECT a from " + dbName + ".ptned WHERE b=1"); - verifyResults(ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)"); - run("SELECT a from " + dbName + ".ptned WHERE b=2"); - verifyResults(ptn_data_2); - run("SELECT a from " + dbName + ".ptned_empty"); - verifyResults(empty); - run("SELECT * from " + dbName + ".unptned_empty"); - verifyResults(empty); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + run("SELECT * from " + dbName + ".unptned", driver); + verifyResults(unptn_data, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); + run("SELECT a from " + dbName + ".ptned WHERE b=1", driver); + verifyResults(ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver); + run("SELECT a from " + dbName + ".ptned WHERE b=2", driver); + verifyResults(ptn_data_2, driver); + run("SELECT a from " + dbName + ".ptned_empty", driver); + verifyResults(empty, driver); + run("SELECT * from " + dbName + ".unptned_empty", driver); + verifyResults(empty, driver); advanceDumpDir(); - run("REPL DUMP " + dbName); - String replDumpLocn = getResult(0,0); - String replDumpId = getResult(0,1,true); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0,0,driver); + String replDumpId = getResult(0,1,true,driver); // Table dropped after "repl dump" - run("DROP TABLE " + dbName + ".unptned"); + run("DROP TABLE " + dbName + ".unptned", driver); // Partition droppped after "repl dump" - run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)"); + run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver); // File changed after "repl dump" Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2"); Path loc = new Path(p.getSd().getLocation()); @@ -254,25 +276,25 @@ public void testBasicWithCM() throws Exception { fs.delete(file, false); fs.copyFromLocalFile(new Path(ptn_locn_2_later), file); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - run("REPL STATUS " + dbName + "_dupe"); - verifyResults(new String[] {replDumpId}); + run("REPL STATUS " + dbName + "_dupe", driverMirror); + verifyResults(new String[] {replDumpId}, driverMirror); - run("SELECT * from " + dbName + "_dupe.unptned"); - verifyResults(unptn_data); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1"); - verifyResults(ptn_data_1); + run("SELECT * from " + dbName + "_dupe.unptned", driverMirror); + verifyResults(unptn_data, driverMirror); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", driverMirror); + verifyResults(ptn_data_1, driverMirror); // Since partition(b=2) changed manually, Hive cannot find // it in original location and cmroot, thus empty - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); - verifyResults(empty); - run("SELECT a from " + dbName + ".ptned_empty"); - verifyResults(empty); - run("SELECT * from " + dbName + ".unptned_empty"); - verifyResults(empty); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", driverMirror); + verifyResults(empty, driverMirror); + run("SELECT a from " + dbName + ".ptned_empty", driver); + verifyResults(empty, driver); + run("SELECT * from " + dbName + ".unptned_empty", driver); + verifyResults(empty, driver); } @Test @@ -281,19 +303,19 @@ public void testIncrementalAdds() throws IOException { LOG.info("Testing "+testName); String dbName = testName + "_" + tid; - run("CREATE DATABASE " + dbName); + run("CREATE DATABASE " + dbName, driver); - run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); advanceDumpDir(); - run("REPL DUMP " + dbName); - String replDumpLocn = getResult(0,0); - String replDumpId = getResult(0,1,true); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0,0,driver); + String replDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); String[] unptn_data = new String[]{ "eleven" , "twelve" }; String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; @@ -308,57 +330,57 @@ public void testIncrementalAdds() throws IOException { createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); - verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driver); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); // Now, we load data into the tables, and see if an incremental // repl drop/load can duplicate it. - run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); - run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * from " + dbName + ".unptned"); - verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * from " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data, driver); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2, driver); - run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)"); - verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1",ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=2)"); - verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2); + run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1",ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); // Perform REPL-DUMP/LOAD advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId ); - String incrementalDumpLocn = getResult(0,0); - String incrementalDumpId = getResult(0,1,true); + run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + String incrementalDumpLocn = getResult(0,0,driver); + String incrementalDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'"); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'", driverMirror); - run("REPL STATUS " + dbName + "_dupe"); - verifyResults(new String[] {incrementalDumpId}); + run("REPL STATUS " + dbName + "_dupe", driverMirror); + verifyResults(new String[] {incrementalDumpId}, driverMirror); // VERIFY tables and partitions on destination for equivalence. - verifyRun("SELECT * from " + dbName + "_dupe.unptned_empty", empty); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_empty", empty); + verifyRun("SELECT * from " + dbName + "_dupe.unptned_empty", empty, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_empty", empty, driverMirror); // verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data); // TODO :this does not work because LOAD DATA LOCAL INPATH into an unptned table seems // to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after // fixing that. - verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data); + verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2, driverMirror); } @Test @@ -368,11 +390,11 @@ public void testDrops() throws IOException { LOG.info("Testing "+testName); String dbName = testName + "_" + tid; - run("CREATE DATABASE " + dbName); - run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned3(a string) partitioned by (b int) STORED AS TEXTFILE"); + run("CREATE DATABASE " + dbName, driver); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned3(a string) partitioned by (b int) STORED AS TEXTFILE", driver); String[] unptn_data = new String[]{ "eleven" , "twelve" }; String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; @@ -387,62 +409,62 @@ public void testDrops() throws IOException { createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')"); - verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'", ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')"); - verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=1)"); - verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=1", ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=2)"); - verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')", driver); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'", ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')", driver); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=1", ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2, driver); // At this point, we've set up all the tables and ptns we're going to test drops across // Replicate it first, and then we'll drop it on the source. advanceDumpDir(); - run("REPL DUMP " + dbName); - String replDumpLocn = getResult(0,0); - String replDumpId = getResult(0,1,true); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - verifySetup("REPL STATUS " + dbName + "_dupe", new String[]{replDumpId}); - - verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data); - verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1); - verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2); - verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1); - verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2); - verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", ptn_data_1); - verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=2", ptn_data_2); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0,0,driver); + String replDumpId = getResult(0,1,true,driver); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + verifySetup("REPL STATUS " + dbName + "_dupe", new String[]{replDumpId}, driverMirror); + + verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", ptn_data_1, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=2", ptn_data_2, driverMirror); // All tables good on destination, drop on source. - run("DROP TABLE " + dbName + ".unptned"); - run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')"); - run("DROP TABLE " + dbName + ".ptned2"); - run("ALTER TABLE " + dbName + ".ptned3 DROP PARTITION (b=1)"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", empty); - verifySetup("SELECT a from " + dbName + ".ptned", ptn_data_1); - verifySetup("SELECT a from " + dbName + ".ptned3 WHERE b=1",empty); - verifySetup("SELECT a from " + dbName + ".ptned3", ptn_data_2); + run("DROP TABLE " + dbName + ".unptned", driver); + run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')", driver); + run("DROP TABLE " + dbName + ".ptned2", driver); + run("ALTER TABLE " + dbName + ".ptned3 DROP PARTITION (b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", empty, driver); + verifySetup("SELECT a from " + dbName + ".ptned", ptn_data_1, driver); + verifySetup("SELECT a from " + dbName + ".ptned3 WHERE b=1",empty, driver); + verifySetup("SELECT a from " + dbName + ".ptned3", ptn_data_2, driver); // replicate the incremental drops advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId); - String postDropReplDumpLocn = getResult(0,0); - String postDropReplDumpId = getResult(0,1,true); + run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + String postDropReplDumpLocn = getResult(0,0,driver); + String postDropReplDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); // verify that drops were replicated. This can either be from tables or ptns // not existing, and thus, throwing a NoSuchObjectException, or returning nulls @@ -450,7 +472,7 @@ public void testDrops() throws IOException { Exception e = null; try { - Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned"); + Table tbl = metaStoreClientMirror.getTable(dbName + "_dupe", "unptned"); assertNull(tbl); } catch (TException te) { e = te; @@ -458,14 +480,14 @@ public void testDrops() throws IOException { assertNotNull(e); assertEquals(NoSuchObjectException.class, e.getClass()); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", empty); - verifyRun("SELECT a from " + dbName + "_dupe.ptned", ptn_data_1); - verifyRun("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", empty); - verifyRun("SELECT a from " + dbName + "_dupe.ptned3", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", empty, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned3 WHERE b=1", empty, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned3", ptn_data_2, driverMirror); Exception e2 = null; try { - Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2"); + Table tbl = metaStoreClientMirror.getTable(dbName+"_dupe","ptned2"); assertNull(tbl); } catch (TException te) { e2 = te; @@ -481,10 +503,10 @@ public void testDropsWithCM() throws IOException { LOG.info("Testing "+testName); String dbName = testName + "_" + tid; - run("CREATE DATABASE " + dbName); - run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); + run("CREATE DATABASE " + dbName, driver); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE", driver); String[] unptn_data = new String[]{ "eleven" , "twelve" }; String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; @@ -499,79 +521,79 @@ public void testDropsWithCM() throws IOException { createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - run("SELECT * from " + dbName + ".unptned"); - verifyResults(unptn_data); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')"); - run("SELECT a from " + dbName + ".ptned WHERE b='1'"); - verifyResults(ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')"); - run("SELECT a from " + dbName + ".ptned WHERE b='2'"); - verifyResults(ptn_data_2); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')"); - run("SELECT a from " + dbName + ".ptned2 WHERE b='1'"); - verifyResults(ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')"); - run("SELECT a from " + dbName + ".ptned2 WHERE b='2'"); - verifyResults(ptn_data_2); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + run("SELECT * from " + dbName + ".unptned", driver); + verifyResults(unptn_data, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')", driver); + run("SELECT a from " + dbName + ".ptned WHERE b='1'", driver); + verifyResults(ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')", driver); + run("SELECT a from " + dbName + ".ptned WHERE b='2'", driver); + verifyResults(ptn_data_2, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')", driver); + run("SELECT a from " + dbName + ".ptned2 WHERE b='1'", driver); + verifyResults(ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')", driver); + run("SELECT a from " + dbName + ".ptned2 WHERE b='2'", driver); + verifyResults(ptn_data_2, driver); advanceDumpDir(); - run("REPL DUMP " + dbName); - String replDumpLocn = getResult(0,0); - String replDumpId = getResult(0,1,true); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - - run("REPL STATUS " + dbName + "_dupe"); - verifyResults(new String[] {replDumpId}); - - run("SELECT * from " + dbName + "_dupe.unptned"); - verifyResults(unptn_data); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'"); - verifyResults(ptn_data_2); - run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'"); - verifyResults(ptn_data_1); - run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'"); - verifyResults(ptn_data_2); - - run("CREATE TABLE " + dbName + ".unptned_copy" + " AS SELECT a FROM " + dbName + ".unptned"); - run("CREATE TABLE " + dbName + ".ptned_copy" + " LIKE " + dbName + ".ptned"); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0,0,driver); + String replDumpId = getResult(0,1,true,driver); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + + run("REPL STATUS " + dbName + "_dupe", driverMirror); + verifyResults(new String[] {replDumpId}, driverMirror); + + run("SELECT * from " + dbName + "_dupe.unptned", driverMirror); + verifyResults(unptn_data, driverMirror); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", driverMirror); + verifyResults(ptn_data_1, driverMirror); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", driverMirror); + verifyResults(ptn_data_2, driverMirror); + run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", driverMirror); + verifyResults(ptn_data_1, driverMirror); + run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", driverMirror); + verifyResults(ptn_data_2, driverMirror); + + run("CREATE TABLE " + dbName + ".unptned_copy" + " AS SELECT a FROM " + dbName + ".unptned", driver); + run("CREATE TABLE " + dbName + ".ptned_copy" + " LIKE " + dbName + ".ptned", driver); run("INSERT INTO TABLE " + dbName + ".ptned_copy" + " PARTITION(b='1') SELECT a FROM " + - dbName + ".ptned WHERE b='1'"); - run("SELECT a from " + dbName + ".unptned_copy"); - verifyResults(unptn_data); - run("SELECT a from " + dbName + ".ptned_copy"); - verifyResults(ptn_data_1); - - run("DROP TABLE " + dbName + ".unptned"); - run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')"); - run("DROP TABLE " + dbName + ".ptned2"); - run("SELECT a from " + dbName + ".ptned WHERE b=2"); - verifyResults(empty); - run("SELECT a from " + dbName + ".ptned"); - verifyResults(ptn_data_1); + dbName + ".ptned WHERE b='1'", driver); + run("SELECT a from " + dbName + ".unptned_copy", driver); + verifyResults(unptn_data, driver); + run("SELECT a from " + dbName + ".ptned_copy", driver); + verifyResults(ptn_data_1, driver); + + run("DROP TABLE " + dbName + ".unptned", driver); + run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')", driver); + run("DROP TABLE " + dbName + ".ptned2", driver); + run("SELECT a from " + dbName + ".ptned WHERE b=2", driver); + verifyResults(empty, driver); + run("SELECT a from " + dbName + ".ptned", driver); + verifyResults(ptn_data_1, driver); advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId); - String postDropReplDumpLocn = getResult(0,0); - String postDropReplDumpId = getResult(0,1,true); + run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + String postDropReplDumpLocn = getResult(0,0,driver); + String postDropReplDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId); // Drop table after dump - run("DROP TABLE " + dbName + ".unptned_copy"); + run("DROP TABLE " + dbName + ".unptned_copy", driver); // Drop partition after dump - run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')"); + run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')", driver); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'", driverMirror); Exception e = null; try { - Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned"); + Table tbl = metaStoreClientMirror.getTable(dbName + "_dupe", "unptned"); assertNull(tbl); } catch (TException te) { e = te; @@ -579,14 +601,14 @@ public void testDropsWithCM() throws IOException { assertNotNull(e); assertEquals(NoSuchObjectException.class, e.getClass()); - run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); - verifyResults(empty); - run("SELECT a from " + dbName + "_dupe.ptned"); - verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", driverMirror); + verifyResults(empty, driverMirror); + run("SELECT a from " + dbName + "_dupe.ptned", driverMirror); + verifyResults(ptn_data_1, driverMirror); Exception e2 = null; try { - Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2"); + Table tbl = metaStoreClientMirror.getTable(dbName+"_dupe","ptned2"); assertNull(tbl); } catch (TException te) { e2 = te; @@ -594,10 +616,10 @@ public void testDropsWithCM() throws IOException { assertNotNull(e2); assertEquals(NoSuchObjectException.class, e.getClass()); - run("SELECT a from " + dbName + "_dupe.unptned_copy"); - verifyResults(unptn_data); - run("SELECT a from " + dbName + "_dupe.ptned_copy"); - verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.unptned_copy", driverMirror); + verifyResults(unptn_data, driverMirror); + run("SELECT a from " + dbName + "_dupe.ptned_copy", driverMirror); + verifyResults(ptn_data_1, driverMirror); } @Test @@ -607,11 +629,11 @@ public void testAlters() throws IOException { LOG.info("Testing "+testName); String dbName = testName + "_" + tid; - run("CREATE DATABASE " + dbName); - run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); + run("CREATE DATABASE " + dbName, driver); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE", driver); String[] unptn_data = new String[]{ "eleven" , "twelve" }; String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; @@ -626,51 +648,51 @@ public void testAlters() throws IOException { createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); - run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned2"); - verifySetup("SELECT * from " + dbName + ".unptned2", unptn_data); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned2", driver); + verifySetup("SELECT * from " + dbName + ".unptned2", unptn_data, driver); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')"); - verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'",ptn_data_1); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')"); - verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='1'", ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", ptn_data_2, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')", driver); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='1'",ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')", driver); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b='2'", ptn_data_2, driver); // base tables set up, let's replicate them over advanceDumpDir(); - run("REPL DUMP " + dbName); - String replDumpLocn = getResult(0,0); - String replDumpId = getResult(0,1,true); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - - run("REPL STATUS " + dbName + "_dupe"); - verifyResults(new String[] {replDumpId}); - - verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data); - verifySetup("SELECT * from " + dbName + "_dupe.unptned2", unptn_data); - verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1); - verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2); - verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1); - verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0,0,driver); + String replDumpId = getResult(0,1,true,driver); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + + run("REPL STATUS " + dbName + "_dupe", driverMirror); + verifyResults(new String[] {replDumpId}, driverMirror); + + verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data, driverMirror); + verifySetup("SELECT * from " + dbName + "_dupe.unptned2", unptn_data, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'", ptn_data_2, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'", ptn_data_1, driverMirror); + verifySetup("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'", ptn_data_2, driverMirror); // tables have been replicated over, and verified to be identical. Now, we do a couple of // alters on the source // Rename unpartitioned table - run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_rn"); - verifySetup("SELECT * from " + dbName + ".unptned_rn", unptn_data); + run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_rn", driver); + verifySetup("SELECT * from " + dbName + ".unptned_rn", unptn_data, driver); // Alter unpartitioned table set table property String testKey = "blah"; String testVal = "foo"; - run("ALTER TABLE " + dbName + ".unptned2 SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')"); + run("ALTER TABLE " + dbName + ".unptned2 SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')", driver); if (VERIFY_SETUP_STEPS){ try { Table unptn2 = metaStoreClient.getTable(dbName,"unptned2"); @@ -682,12 +704,12 @@ public void testAlters() throws IOException { } // alter partitioned table, rename partition - run("ALTER TABLE " + dbName + ".ptned PARTITION (b='2') RENAME TO PARTITION (b='22')"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", empty); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=22", ptn_data_2); + run("ALTER TABLE " + dbName + ".ptned PARTITION (b='2') RENAME TO PARTITION (b='22')", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", empty, driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=22", ptn_data_2, driver); // alter partitioned table set table property - run("ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')"); + run("ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('" + testKey + "' = '" + testVal + "')", driver); if (VERIFY_SETUP_STEPS){ try { Table ptned = metaStoreClient.getTable(dbName,"ptned"); @@ -711,62 +733,62 @@ public void testAlters() throws IOException { } // rename partitioned table - verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2); - run("ALTER TABLE " + dbName + ".ptned2 RENAME TO " + dbName + ".ptned2_rn"); - verifySetup("SELECT a from " + dbName + ".ptned2_rn WHERE b=2", ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2, driver); + run("ALTER TABLE " + dbName + ".ptned2 RENAME TO " + dbName + ".ptned2_rn", driver); + verifySetup("SELECT a from " + dbName + ".ptned2_rn WHERE b=2", ptn_data_2, driver); // All alters done, now we replicate them over. advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId); - String postAlterReplDumpLocn = getResult(0,0); - String postAlterReplDumpId = getResult(0,1,true); + run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + String postAlterReplDumpLocn = getResult(0,0,driver); + String postAlterReplDumpId = getResult(0,1,true,driver); LOG.info("Dumped to {} with id {}->{}", postAlterReplDumpLocn, replDumpId, postAlterReplDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'"); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + postAlterReplDumpLocn + "'", driverMirror); // Replication done, we now do the following verifications: // verify that unpartitioned table rename succeeded. Exception e = null; try { - Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "unptned"); + Table tbl = metaStoreClientMirror.getTable(dbName + "_dupe" , "unptned"); assertNull(tbl); } catch (TException te) { e = te; } assertNotNull(e); assertEquals(NoSuchObjectException.class, e.getClass()); - verifyRun("SELECT * from " + dbName + "_dupe.unptned_rn", unptn_data); + verifyRun("SELECT * from " + dbName + "_dupe.unptned_rn", unptn_data, driverMirror); // verify that partition rename succeded. try { - Table unptn2 = metaStoreClient.getTable(dbName + "_dupe" , "unptned2"); + Table unptn2 = metaStoreClientMirror.getTable(dbName + "_dupe" , "unptned2"); assertTrue(unptn2.getParameters().containsKey(testKey)); assertEquals(testVal,unptn2.getParameters().get(testKey)); } catch (TException te) { assertNull(te); } - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=22", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", empty, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=22", ptn_data_2, driverMirror); // verify that ptned table rename succeded. Exception e2 = null; try { - Table tbl = metaStoreClient.getTable(dbName + "_dupe" , "ptned2"); + Table tbl = metaStoreClientMirror.getTable(dbName + "_dupe" , "ptned2"); assertNull(tbl); } catch (TException te) { e2 = te; } assertNotNull(e2); assertEquals(NoSuchObjectException.class, e.getClass()); - verifyRun("SELECT a from " + dbName + "_dupe.ptned2_rn WHERE b=2", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned2_rn WHERE b=2", ptn_data_2, driverMirror); // verify that ptned table property set worked try { - Table ptned = metaStoreClient.getTable(dbName + "_dupe" , "ptned"); + Table ptned = metaStoreClientMirror.getTable(dbName + "_dupe" , "ptned"); assertTrue(ptned.getParameters().containsKey(testKey)); assertEquals(testVal, ptned.getParameters().get(testKey)); } catch (TException te) { @@ -777,7 +799,7 @@ public void testAlters() throws IOException { try { List ptnVals1 = new ArrayList(); ptnVals1.add("1"); - Partition ptn1 = metaStoreClient.getPartition(dbName + "_dupe", "ptned", ptnVals1); + Partition ptn1 = metaStoreClientMirror.getPartition(dbName + "_dupe", "ptned", ptnVals1); assertTrue(ptn1.getParameters().containsKey(testKey)); assertEquals(testVal,ptn1.getParameters().get(testKey)); } catch (TException te) { @@ -800,20 +822,20 @@ public void testIncrementalInserts() throws IOException { LOG.info("Testing " + testName); String dbName = testName + "_" + tid; - run("CREATE DATABASE " + dbName); + run("CREATE DATABASE " + dbName, driver); - run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); - run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); run("CREATE TABLE " + dbName - + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE"); + + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); advanceDumpDir(); - run("REPL DUMP " + dbName); - String replDumpLocn = getResult(0, 0); - String replDumpId = getResult(0, 1, true); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0, 0, driver); + String replDumpId = getResult(0, 1, true, driver); LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); String[] unptn_data = new String[] { "eleven", "twelve" }; String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" }; @@ -828,53 +850,53 @@ public void testIncrementalInserts() throws IOException { createTestDataFile(ptn_locn_1, ptn_data_1); createTestDataFile(ptn_locn_2, ptn_data_2); - verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); - verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driver); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); - run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); - verifySetup("SELECT * from " + dbName + ".unptned", unptn_data); - run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned"); - run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned"); - verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data); + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data, driver); advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId); - String incrementalDumpLocn = getResult(0, 0); - String incrementalDumpId = getResult(0, 1, true); + run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + String incrementalDumpLocn = getResult(0, 0, driver); + String incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); - verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data, driverMirror); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName - + ".ptned PARTITION(b=1)"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1); + + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1, driver); run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName - + ".ptned PARTITION(b=2)"); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2); + + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2, driver); run("CREATE TABLE " + dbName - + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName - + ".ptned WHERE b=1"); - verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptn_data_1); + + ".ptned WHERE b=1", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptn_data_1, driver); run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName - + ".ptned WHERE b=2"); - verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2); + + ".ptned WHERE b=2", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + replDumpId); - incrementalDumpLocn = getResult(0, 0); - incrementalDumpId = getResult(0, 1, true); + run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + incrementalDumpLocn = getResult(0, 0, driver); + incrementalDumpId = getResult(0, 1, true, driver); LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1); - verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2, driverMirror); } @Test @@ -903,12 +925,12 @@ public void testStatus() throws IOException { LOG.info("Testing " + testName); String dbName = testName + "_" + tid; - run("CREATE DATABASE " + dbName); + run("CREATE DATABASE " + dbName, driver); advanceDumpDir(); - run("REPL DUMP " + dbName); - String lastReplDumpLocn = getResult(0, 0); - String lastReplDumpId = getResult(0, 1, true); - run("REPL LOAD " + dbName + "_dupe FROM '" + lastReplDumpLocn + "'"); + run("REPL DUMP " + dbName, driver); + String lastReplDumpLocn = getResult(0, 0, driver); + String lastReplDumpId = getResult(0, 1, true, driver); + run("REPL LOAD " + dbName + "_dupe FROM '" + lastReplDumpLocn + "'", driverMirror); // Bootstrap done, now on to incremental. First, we test db-level REPL LOADs. // Both db-level and table-level repl.last.id must be updated. @@ -958,16 +980,91 @@ public void testStatus() throws IOException { } + @Test + public void testConstraints() throws IOException { + String testName = "constraints"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName, driver); + + run("CREATE TABLE " + dbName + ".tbl1(a string, b string, primary key (a) disable novalidate rely)", driver); + run("CREATE TABLE " + dbName + ".tbl2(a string, b string, foreign key (a, b) references " + dbName + ".tbl1(a, b) disable novalidate)", driver); + + advanceDumpDir(); + run("REPL DUMP " + dbName, driver); + String replDumpLocn = getResult(0, 0, driver); + String replDumpId = getResult(0, 1, true, driver); + LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); + + // bootstrap replication for constraint is not implemented. Will verify it works once done + try { + List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(dbName+ "_dupe" , "tbl1")); + assertTrue(pks.isEmpty()); + List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, dbName+ "_dupe" , "tbl1")); + assertTrue(fks.isEmpty()); + } catch (TException te) { + assertNull(te); + } + + run("CREATE TABLE " + dbName + ".tbl3(a string, b string, primary key (a) disable novalidate rely)", driver); + run("CREATE TABLE " + dbName + ".tbl4(a string, b string, foreign key (a, b) references " + dbName + ".tbl3(a, b) disable novalidate)", driver); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId, driver); + String incrementalDumpLocn = getResult(0, 0, driver); + String incrementalDumpId = getResult(0, 1, true, driver); + LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + + String pkName = null; + String fkName = null; + try { + List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(dbName+ "_dupe" , "tbl3")); + assertEquals(pks.size(), 1); + pkName = pks.get(0).getPk_name(); + List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, dbName+ "_dupe" , "tbl4")); + assertEquals(fks.size(), 1); + fkName = fks.get(0).getFk_name(); + } catch (TException te) { + assertNull(te); + } + + run("ALTER TABLE " + dbName + ".tbl3 DROP CONSTRAINT `" + pkName + "`", driver); + run("ALTER TABLE " + dbName + ".tbl4 DROP CONSTRAINT `" + fkName + "`", driver); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + incrementalDumpId, driver); + incrementalDumpLocn = getResult(0, 0, driver); + incrementalDumpId = getResult(0, 1, true, driver); + LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + printOutput(driverMirror); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'", driverMirror); + + try { + List pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(dbName+ "_dupe" , "tbl3")); + assertTrue(pks.isEmpty()); + List fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, dbName+ "_dupe" , "tbl4")); + assertTrue(fks.isEmpty()); + } catch (TException te) { + assertNull(te); + } + } + private String verifyAndReturnDbReplStatus(String dbName, String tblName, String prevReplDumpId, String cmd) throws IOException { - run(cmd); + run(cmd, driver); advanceDumpDir(); - run("REPL DUMP " + dbName + " FROM " + prevReplDumpId); - String lastDumpLocn = getResult(0, 0); - String lastReplDumpId = getResult(0, 1, true); - run("REPL LOAD " + dbName + "_dupe FROM '" + lastDumpLocn + "'"); - verifyRun("REPL STATUS " + dbName + "_dupe", lastReplDumpId); + run("REPL DUMP " + dbName + " FROM " + prevReplDumpId, driver); + String lastDumpLocn = getResult(0, 0, driver); + String lastReplDumpId = getResult(0, 1, true, driver); + run("REPL LOAD " + dbName + "_dupe FROM '" + lastDumpLocn + "'", driverMirror); + verifyRun("REPL STATUS " + dbName + "_dupe", lastReplDumpId, driverMirror); if (tblName != null){ - verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId); + verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId, driverMirror); } assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0); return lastReplDumpId; @@ -976,27 +1073,27 @@ private String verifyAndReturnDbReplStatus(String dbName, String tblName, String // Tests that doing a table-level REPL LOAD updates table repl.last.id, but not db-level repl.last.id private String verifyAndReturnTblReplStatus( String dbName, String tblName, String lastDbReplDumpId, String prevReplDumpId, String cmd) throws IOException { - run(cmd); + run(cmd, driver); advanceDumpDir(); - run("REPL DUMP " + dbName + "."+ tblName + " FROM " + prevReplDumpId); - String lastDumpLocn = getResult(0, 0); - String lastReplDumpId = getResult(0, 1, true); - run("REPL LOAD " + dbName + "_dupe." + tblName + " FROM '" + lastDumpLocn + "'"); - verifyRun("REPL STATUS " + dbName + "_dupe", lastDbReplDumpId); - verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId); + run("REPL DUMP " + dbName + "."+ tblName + " FROM " + prevReplDumpId, driver); + String lastDumpLocn = getResult(0, 0, driver); + String lastReplDumpId = getResult(0, 1, true, driver); + run("REPL LOAD " + dbName + "_dupe." + tblName + " FROM '" + lastDumpLocn + "'", driverMirror); + verifyRun("REPL STATUS " + dbName + "_dupe", lastDbReplDumpId, driverMirror); + verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId, driverMirror); assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0); return lastReplDumpId; } - private String getResult(int rowNum, int colNum) throws IOException { - return getResult(rowNum,colNum,false); + private String getResult(int rowNum, int colNum, Driver myDriver) throws IOException { + return getResult(rowNum,colNum,false,myDriver); } - private String getResult(int rowNum, int colNum, boolean reuse) throws IOException { + private String getResult(int rowNum, int colNum, boolean reuse, Driver myDriver) throws IOException { if (!reuse) { lastResults = new ArrayList(); try { - driver.getResults(lastResults); + myDriver.getResults(lastResults); } catch (CommandNeedRetryException e) { e.printStackTrace(); throw new RuntimeException(e); @@ -1006,8 +1103,8 @@ private String getResult(int rowNum, int colNum, boolean reuse) throws IOExcepti return (lastResults.get(rowNum).split("\\t"))[colNum]; } - private void verifyResults(String[] data) throws IOException { - List results = getOutput(); + private void verifyResults(String[] data, Driver myDriver) throws IOException { + List results = getOutput(myDriver); LOG.info("Expecting {}",data); LOG.info("Got {}",results); assertEquals(data.length,results.size()); @@ -1016,10 +1113,10 @@ private void verifyResults(String[] data) throws IOException { } } - private List getOutput() throws IOException { + private List getOutput(Driver myDriver) throws IOException { List results = new ArrayList(); try { - driver.getResults(results); + myDriver.getResults(results); } catch (CommandNeedRetryException e) { LOG.warn(e.getMessage(),e); throw new RuntimeException(e); @@ -1027,31 +1124,31 @@ private void verifyResults(String[] data) throws IOException { return results; } - private void printOutput() throws IOException { - for (String s : getOutput()){ + private void printOutput(Driver myDriver) throws IOException { + for (String s : getOutput(myDriver)){ LOG.info(s); } } - private void verifySetup(String cmd, String[] data) throws IOException { + private void verifySetup(String cmd, String[] data, Driver myDriver) throws IOException { if (VERIFY_SETUP_STEPS){ - run(cmd); - verifyResults(data); + run(cmd, myDriver); + verifyResults(data, myDriver); } } - private void verifyRun(String cmd, String data) throws IOException { - verifyRun(cmd, new String[] { data }); + private void verifyRun(String cmd, String data, Driver myDriver) throws IOException { + verifyRun(cmd, new String[] { data }, myDriver); } - private void verifyRun(String cmd, String[] data) throws IOException { - run(cmd); - verifyResults(data); + private void verifyRun(String cmd, String[] data, Driver myDriver) throws IOException { + run(cmd, myDriver); + verifyResults(data, myDriver); } - private static void run(String cmd) throws RuntimeException { + private static void run(String cmd, Driver myDriver) throws RuntimeException { try { - run(cmd,false); // default arg-less run simply runs, and does not care about failure + run(cmd,false, myDriver); // default arg-less run simply runs, and does not care about failure } catch (AssertionError ae){ // Hive code has AssertionErrors in some cases - we want to record what happens LOG.warn("AssertionError:",ae); @@ -1059,10 +1156,10 @@ private static void run(String cmd) throws RuntimeException { } } - private static boolean run(String cmd, boolean errorOnFail) throws RuntimeException { + private static boolean run(String cmd, boolean errorOnFail, Driver myDriver) throws RuntimeException { boolean success = false; try { - CommandProcessorResponse ret = driver.run(cmd); + CommandProcessorResponse ret = myDriver.run(cmd); success = (ret.getException() == null); if (!success){ LOG.warn("Error {} : {} running [{}].", ret.getErrorCode(), ret.getErrorMessage(), cmd); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 13d0aab..65102ff 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -77,8 +77,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AddPrimaryKeyEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; @@ -86,6 +88,7 @@ import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropConstraintEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; import org.apache.hadoop.hive.metastore.events.DropIndexEvent; @@ -1434,14 +1437,37 @@ private void create_table_core(final RawStore ms, final Table tbl, if (primaryKeys == null && foreignKeys == null) { ms.createTable(tbl); } else { - ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys); + // Set constraint name if null before sending to listener + List constraintNames = ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys); + int primaryKeySize = 0; + if (primaryKeys != null) { + primaryKeySize = primaryKeys.size(); + for (int i = 0; i < primaryKeySize; i++) { + if (primaryKeys.get(i).getPk_name() == null) { + primaryKeys.get(i).setPk_name(constraintNames.get(i)); + } + } + } + if (foreignKeys != null) { + for (int i = 0; i < foreignKeys.size(); i++) { + if (foreignKeys.get(i).getFk_name() == null) { + foreignKeys.get(i).setFk_name(constraintNames.get(primaryKeySize + i)); + } + } + } } if (transactionalListeners.size() > 0) { CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, this); createTableEvent.setEnvironmentContext(envContext); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onCreateTable(createTableEvent); + if (primaryKeys != null && primaryKeys.size() > 0) { + AddPrimaryKeyEvent addPrimaryKeyEvent = new AddPrimaryKeyEvent(primaryKeys, true, this); + AddForeignKeyEvent addForeignKeyEvent = new AddForeignKeyEvent(foreignKeys, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onCreateTable(createTableEvent); + transactionalListener.onAddPrimaryKey(addPrimaryKeyEvent); + transactionalListener.onAddForeignKey(addForeignKeyEvent); + } } } @@ -1458,6 +1484,14 @@ private void create_table_core(final RawStore ms, final Table tbl, new CreateTableEvent(tbl, success, this); createTableEvent.setEnvironmentContext(envContext); listener.onCreateTable(createTableEvent); + if (primaryKeys != null && primaryKeys.size() > 0) { + AddPrimaryKeyEvent addPrimaryKeyEvent = new AddPrimaryKeyEvent(primaryKeys, true, this); + listener.onAddPrimaryKey(addPrimaryKeyEvent); + } + if (foreignKeys != null && foreignKeys.size() > 0) { + AddForeignKeyEvent addForeignKeyEvent = new AddForeignKeyEvent(foreignKeys, true, this); + listener.onAddForeignKey(addForeignKeyEvent); + } } } } @@ -1536,8 +1570,17 @@ public void drop_constraint(DropConstraintRequest req) boolean success = false; Exception ex = null; try { - getMS().dropConstraint(dbName, tableName, constraintName); - success = true; + RawStore ms = getMS(); + ms.openTransaction(); + ms.dropConstraint(dbName, tableName, constraintName); + if (transactionalListeners.size() > 0) { + DropConstraintEvent dropConstraintEvent = new DropConstraintEvent(dbName, + tableName, constraintName, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onDropConstraint(dropConstraintEvent); + } + } + success = ms.commitTransaction(); } catch (NoSuchObjectException e) { ex = e; throw new InvalidObjectException(e.getMessage()); @@ -1551,6 +1594,11 @@ public void drop_constraint(DropConstraintRequest req) throw newMetaException(e); } } finally { + for (MetaStoreEventListener listener : listeners) { + DropConstraintEvent dropConstraintEvent = new DropConstraintEvent(dbName, + tableName, constraintName, true, this); + listener.onDropConstraint(dropConstraintEvent); + } endFunction("drop_constraint", success, ex, constraintName); } } @@ -1565,8 +1613,24 @@ public void add_primary_key(AddPrimaryKeyRequest req) boolean success = false; Exception ex = null; try { - getMS().addPrimaryKeys(primaryKeyCols); - success = true; + RawStore ms = getMS(); + ms.openTransaction(); + List constraintNames = ms.addPrimaryKeys(primaryKeyCols); + // Set primary key name if null before sending to listener + for (int i = 0; i < primaryKeyCols.size(); i++) { + if (primaryKeyCols.get(i).getPk_name() == null) { + primaryKeyCols.get(i).setPk_name(constraintNames.get(i)); + } + } + if (transactionalListeners.size() > 0) { + if (primaryKeyCols != null && primaryKeyCols.size() > 0) { + AddPrimaryKeyEvent addPrimaryKeyEvent = new AddPrimaryKeyEvent(primaryKeyCols, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddPrimaryKey(addPrimaryKeyEvent); + } + } + } + success = ms.commitTransaction(); } catch (Exception e) { ex = e; if (e instanceof MetaException) { @@ -1577,6 +1641,12 @@ public void add_primary_key(AddPrimaryKeyRequest req) throw newMetaException(e); } } finally { + if (primaryKeyCols != null && primaryKeyCols.size() > 0) { + for (MetaStoreEventListener listener : listeners) { + AddPrimaryKeyEvent addPrimaryKeyEvent = new AddPrimaryKeyEvent(primaryKeyCols, true, this); + listener.onAddPrimaryKey(addPrimaryKeyEvent); + } + } endFunction("add_primary_key", success, ex, constraintName); } } @@ -1591,8 +1661,24 @@ public void add_foreign_key(AddForeignKeyRequest req) boolean success = false; Exception ex = null; try { - getMS().addForeignKeys(foreignKeyCols); - success = true; + RawStore ms = getMS(); + ms.openTransaction(); + List constraintNames = ms.addForeignKeys(foreignKeyCols); + // Set foreign key name if null before sending to listener + for (int i = 0; i < foreignKeyCols.size(); i++) { + if (foreignKeyCols.get(i).getFk_name() == null) { + foreignKeyCols.get(i).setFk_name(constraintNames.get(i)); + } + } + if (transactionalListeners.size() > 0) { + if (foreignKeyCols != null && foreignKeyCols.size() > 0) { + AddForeignKeyEvent addForeignKeyEvent = new AddForeignKeyEvent(foreignKeyCols, true, this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onAddForeignKey(addForeignKeyEvent); + } + } + } + success = ms.commitTransaction(); } catch (Exception e) { ex = e; if (e instanceof MetaException) { @@ -1603,6 +1689,12 @@ public void add_foreign_key(AddForeignKeyRequest req) throw newMetaException(e); } } finally { + if (foreignKeyCols != null && foreignKeyCols.size() > 0) { + for (MetaStoreEventListener listener : listeners) { + AddForeignKeyEvent addForeignKeyEvent = new AddForeignKeyEvent(foreignKeyCols, true, this); + listener.onAddForeignKey(addForeignKeyEvent); + } + } endFunction("add_foreign_key", success, ex, constraintName); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java index b0defb5..784ba90 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java @@ -21,7 +21,9 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; +import org.apache.hadoop.hive.metastore.events.AddPrimaryKeyEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; @@ -30,6 +32,7 @@ import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropConstraintEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; import org.apache.hadoop.hive.metastore.events.DropIndexEvent; @@ -171,6 +174,27 @@ public void onInsert(InsertEvent insertEvent) throws MetaException { } + /** + * @param addPrimaryKeyEvent add primary key event + * @throws MetaException + */ + public void onAddPrimaryKey(AddPrimaryKeyEvent addPrimaryKeyEvent) throws MetaException { + } + + /** + * @param addForeignKeyEvent add foreign key event + * @throws MetaException + */ + public void onAddForeignKey(AddForeignKeyEvent addForeignKeyEvent) throws MetaException { + } + + /** + * @param dropConstraintEvent drop constraint event + * @throws MetaException + */ + public void onDropConstraint(DropConstraintEvent dropConstraintEvent) throws MetaException { + } + @Override public Configuration getConf() { return this.conf; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 778615d..dff7e0d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -996,7 +996,7 @@ public boolean dropType(String typeName) { } @Override - public void createTableWithConstraints(Table tbl, + public List createTableWithConstraints(Table tbl, List primaryKeys, List foreignKeys) throws InvalidObjectException, MetaException { boolean success = false; @@ -1006,9 +1006,11 @@ public void createTableWithConstraints(Table tbl, // Add primary keys and foreign keys. // We need not do a deep retrieval of the Table Column Descriptor while persisting the PK/FK // since this transaction involving create table is not yet committed. - addPrimaryKeys(primaryKeys, false); - addForeignKeys(foreignKeys, false); + List constraintNames = new ArrayList(); + constraintNames.addAll(addPrimaryKeys(primaryKeys, false)); + constraintNames.addAll(addForeignKeys(foreignKeys, false)); success = commitTransaction(); + return constraintNames; } finally { if (!success) { rollbackTransaction(); @@ -3593,14 +3595,15 @@ private String generateConstraintName(String... parameters) throws MetaException } @Override - public void addForeignKeys( + public List addForeignKeys( List fks) throws InvalidObjectException, MetaException { - addForeignKeys(fks, true); + return addForeignKeys(fks, true); } - private void addForeignKeys( + private List addForeignKeys( List fks, boolean retrieveCD) throws InvalidObjectException, MetaException { + List fkNames = new ArrayList(); List mpkfks = new ArrayList(); String currentConstraintName = null; @@ -3649,6 +3652,7 @@ private void addForeignKeys( } else { currentConstraintName = fks.get(i).getFk_name(); } + fkNames.add(currentConstraintName); Integer updateRule = fks.get(i).getUpdate_rule(); Integer deleteRule = fks.get(i).getDelete_rule(); int enableValidateRely = (fks.get(i).isEnable_cstr() ? 4 : 0) + @@ -3670,16 +3674,18 @@ private void addForeignKeys( mpkfks.add(mpkfk); } pm.makePersistentAll(mpkfks); + return fkNames; } @Override - public void addPrimaryKeys(List pks) throws InvalidObjectException, + public List addPrimaryKeys(List pks) throws InvalidObjectException, MetaException { - addPrimaryKeys(pks, true); + return addPrimaryKeys(pks, true); } - private void addPrimaryKeys(List pks, boolean retrieveCD) throws InvalidObjectException, + private List addPrimaryKeys(List pks, boolean retrieveCD) throws InvalidObjectException, MetaException { + List pkNames = new ArrayList(); List mpks = new ArrayList(); String constraintName = null; @@ -3711,7 +3717,7 @@ private void addPrimaryKeys(List pks, boolean retrieveCD) throws } else { constraintName = pks.get(i).getPk_name(); } - + pkNames.add(constraintName); int enableValidateRely = (pks.get(i).isEnable_cstr() ? 4 : 0) + (pks.get(i).isValidate_cstr() ? 2 : 0) + (pks.get(i).isRely_cstr() ? 1 : 0); MConstraint mpk = new MConstraint( @@ -3730,6 +3736,7 @@ private void addPrimaryKeys(List pks, boolean retrieveCD) throws mpks.add(mpk); } pm.makePersistentAll(mpks); + return pkNames; } @Override diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index 6f4f031..f5e6fe0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -688,12 +688,12 @@ void getFileMetadataByExpr(List fileIds, FileMetadataExprType type, byte[] String parent_tbl_name, String foreign_db_name, String foreign_tbl_name) throws MetaException; - void createTableWithConstraints(Table tbl, List primaryKeys, + List createTableWithConstraints(Table tbl, List primaryKeys, List foreignKeys) throws InvalidObjectException, MetaException; void dropConstraint(String dbName, String tableName, String constraintName) throws NoSuchObjectException; - void addPrimaryKeys(List pks) throws InvalidObjectException, MetaException; + List addPrimaryKeys(List pks) throws InvalidObjectException, MetaException; - void addForeignKeys(List fks) throws InvalidObjectException, MetaException; + List addForeignKeys(List fks) throws InvalidObjectException, MetaException; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddForeignKeyEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddForeignKeyEvent.java new file mode 100644 index 0000000..1dc9588 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddForeignKeyEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import java.util.List; + +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; + +public class AddForeignKeyEvent extends ListenerEvent { + private final List fks; + + public AddForeignKeyEvent(List fks, boolean status, HMSHandler handler) { + super(status, handler); + this.fks = fks; + } + + public List getForeignKeyCols() { + return fks; + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPrimaryKeyEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPrimaryKeyEvent.java new file mode 100644 index 0000000..cb0f562 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPrimaryKeyEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import java.util.List; + +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; + +public class AddPrimaryKeyEvent extends ListenerEvent { + + private final List pks; + + public AddPrimaryKeyEvent(List pks, boolean status, HMSHandler handler) { + super(status, handler); + this.pks = pks; + } + + public List getPrimaryKeyCols() { + return pks; + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropConstraintEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropConstraintEvent.java new file mode 100644 index 0000000..5396863 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/DropConstraintEvent.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; + +public class DropConstraintEvent extends ListenerEvent { + + private final String dbName; + private final String tableName; + private final String constraintName; + public DropConstraintEvent(String dbName, String tableName, String constraintName, + boolean status, HMSHandler handler) { + super(status, handler); + this.dbName = dbName; + this.tableName = tableName; + this.constraintName = constraintName; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public String getConstraintName() { + return constraintName; + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 6593fa6..76fa4f1 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheLoader; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.common.ObjectPair; import org.slf4j.Logger; @@ -74,6 +75,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PlanResult; import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan; +import org.apache.hadoop.hive.metastore.model.MConstraint; import org.apache.hadoop.hive.metastore.parser.ExpressionTree; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; @@ -93,6 +95,8 @@ import java.util.Map.Entry; import java.util.Set; +import javax.jdo.Query; + /** * Implementation of RawStore that stores data in HBase */ @@ -2733,16 +2737,18 @@ public void putFileMetadata(List fileIds, List metadata, } @Override - public void createTableWithConstraints(Table tbl, List primaryKeys, + public List createTableWithConstraints(Table tbl, List primaryKeys, List foreignKeys) throws InvalidObjectException, MetaException { boolean commit = false; openTransaction(); try { createTable(tbl); - if (primaryKeys != null) addPrimaryKeys(primaryKeys); - if (foreignKeys != null) addForeignKeys(foreignKeys); + List constraintNames = new ArrayList(); + if (primaryKeys != null) constraintNames.addAll(addPrimaryKeys(primaryKeys)); + if (foreignKeys != null) constraintNames.addAll(addForeignKeys(foreignKeys)); commit = true; + return constraintNames; } finally { commitOrRoleBack(commit); } @@ -2790,8 +2796,50 @@ public void dropConstraint(String dbName, String tableName, String constraintNam } } + private String generateConstraintName(String dbName, String tableName, String... parameters) throws MetaException { + String[] allParams = new String[parameters.length + 2]; + allParams[0] = dbName; + allParams[1] = tableName; + System.arraycopy(parameters, 0, allParams, 2, parameters.length); + int hashcode = ArrayUtils.toString(parameters).hashCode(); + int counter = 0; + final int MAX_RETRIES = 10; + while (counter < MAX_RETRIES) { + String currName = (parameters.length == 0 ? "constraint_" : parameters[parameters.length-1]) + + "_" + hashcode + "_" + System.currentTimeMillis() + "_" + (counter++); + if (!constraintNameAlreadyExists(dbName, tableName, currName)) { + return currName; + } + } + throw new MetaException("Error while trying to generate the constraint name for " + ArrayUtils.toString(parameters)); + } + + private boolean constraintNameAlreadyExists(String dbName, String tableName, String constraintName) throws MetaException { + try { + List pk = getHBase().getPrimaryKey(dbName, tableName); + if (pk != null && pk.size() > 0 && pk.get(0).getPk_name().equals(constraintName)) { + return true; + } + + List fks = getHBase().getForeignKeys(dbName, tableName); + if (fks != null && fks.size() > 0) { + for (SQLForeignKey fkcol : fks) { + if (!fkcol.getFk_name().equals(constraintName)) { + return true; + } + } + return false; + } + } catch (IOException e) { + throw new MetaException("Error checking duplicate constraint " + dbName + "." + tableName); + } + return false; + } + @Override - public void addPrimaryKeys(List pks) throws InvalidObjectException, MetaException { + public List addPrimaryKeys(List pks) throws InvalidObjectException, MetaException { + List pkNames = new ArrayList(); + String constraintName = null; boolean commit = false; openTransaction(); try { @@ -2801,8 +2849,20 @@ public void addPrimaryKeys(List pks) throws InvalidObjectExceptio throw new MetaException(" Primary key already exists for: " + tableNameForErrorMsg(pks.get(0).getTable_db(), pks.get(0).getTable_name())); } + for (int i = 0; i < pks.size(); i++) { + if (pks.get(i).getPk_name() == null) { + if (pks.get(i).getKey_seq() == 1) { + constraintName = generateConstraintName(pks.get(i).getTable_db(), pks.get(i).getTable_name(), + pks.get(i).getColumn_name(), "pk"); + } + } else { + constraintName = pks.get(i).getPk_name(); + } + pkNames.add(constraintName); + } getHBase().putPrimaryKey(pks); commit = true; + return pkNames; } catch (IOException e) { LOG.error("Error writing primary key", e); throw new MetaException("Error writing primary key: " + e.getMessage()); @@ -2812,10 +2872,31 @@ public void addPrimaryKeys(List pks) throws InvalidObjectExceptio } @Override - public void addForeignKeys(List fks) throws InvalidObjectException, MetaException { + public List addForeignKeys(List fks) throws InvalidObjectException, MetaException { + List fkNames = new ArrayList(); + String currentConstraintName = null; boolean commit = false; openTransaction(); try { + for (int i = 0; i < fks.size(); i++) { + if (fks.get(i).getFk_name() == null) { + // When there is no explicit foreign key name associated with the constraint and the key is composite, + // we expect the foreign keys to be send in order in the input list. + // Otherwise, the below code will break. + // If this is the first column of the FK constraint, generate the foreign key name + // NB: The below code can result in race condition where duplicate names can be generated (in theory). + // However, this scenario can be ignored for practical purposes because of + // the uniqueness of the generated constraint name. + if (fks.get(i).getKey_seq() == 1) { + currentConstraintName = generateConstraintName(fks.get(i).getFktable_db(), fks.get(i).getFktable_name(), + fks.get(i).getPktable_db(), fks.get(i).getPktable_name(), + fks.get(i).getPkcolumn_name(), fks.get(i).getFkcolumn_name(), "fk"); + } + } else { + currentConstraintName = fks.get(i).getFk_name(); + } + fkNames.add(currentConstraintName); + } // Fetch the existing keys (if any) and add in these new ones List existing = getHBase().getForeignKeys(fks.get(0).getFktable_db(), fks.get(0).getFktable_name()); @@ -2823,6 +2904,7 @@ public void addForeignKeys(List fks) throws InvalidObjectExceptio existing.addAll(fks); getHBase().putForeignKeys(existing); commit = true; + return fkNames; } catch (IOException e) { LOG.error("Error writing foreign keys", e); throw new MetaException("Error writing foreign keys: " + e.getMessage()); diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddForeignKeyMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddForeignKeyMessage.java new file mode 100644 index 0000000..2eb14a1 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddForeignKeyMessage.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; + +public abstract class AddForeignKeyMessage extends EventMessage { + protected AddForeignKeyMessage() { + super(EventType.ADD_FOREIGNKEY); + } + + /** + * Getter for list of foreign keys. + * @return List of SQLForeignKey + */ + public abstract List getForeignKeys() throws Exception; +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPrimaryKeyMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPrimaryKeyMessage.java new file mode 100644 index 0000000..0e899ad --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPrimaryKeyMessage.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; + +public abstract class AddPrimaryKeyMessage extends EventMessage { + protected AddPrimaryKeyMessage() { + super(EventType.ADD_PRIMARYKEY); + } + + /** + * Getter for list of primary keys. + * @return List of SQLPrimaryKey + */ + public abstract List getPrimaryKeys() throws Exception; +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropConstraintMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropConstraintMessage.java new file mode 100644 index 0000000..6e691e9 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropConstraintMessage.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging; + +public abstract class DropConstraintMessage extends EventMessage { + protected DropConstraintMessage() { + super(EventType.DROP_CONSTRAINT); + } + + public abstract String getTable(); + + public abstract String getConstraint(); +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java index 1ec0de0..68facce 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java @@ -43,7 +43,10 @@ DROP_FUNCTION(MessageFactory.DROP_FUNCTION_EVENT), CREATE_INDEX(MessageFactory.CREATE_INDEX_EVENT), DROP_INDEX(MessageFactory.DROP_INDEX_EVENT), - ALTER_INDEX(MessageFactory.ALTER_INDEX_EVENT); + ALTER_INDEX(MessageFactory.ALTER_INDEX_EVENT), + ADD_PRIMARYKEY(MessageFactory.ADD_PRIMARYKEY_EVENT), + ADD_FOREIGNKEY(MessageFactory.ADD_FOREIGNKEY_EVENT), + DROP_CONSTRAINT(MessageFactory.DROP_CONSTRAINT_EVENT); private String typeString; diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java index 515c455..fce6f62 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java @@ -58,7 +58,10 @@ public EventMessage getEventMessage(String eventTypeString, String messageBody) return getAlterIndexMessage(messageBody); case INSERT: return getInsertMessage(messageBody); - + case ADD_PRIMARYKEY: + return getAddPrimaryKeyMessage(messageBody); + case DROP_CONSTRAINT: + return getDropConstraintMessage(messageBody); default: throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString); } @@ -140,6 +143,21 @@ public EventMessage getEventMessage(String eventTypeString, String messageBody) */ public abstract InsertMessage getInsertMessage(String messageBody); + /** + * Method to de-serialize AddPrimaryKeyMessage instance. + */ + public abstract AddPrimaryKeyMessage getAddPrimaryKeyMessage(String messageBody); + + /** + * Method to de-serialize AddForeignKeyMessage instance. + */ + public abstract AddForeignKeyMessage getAddForeignKeyMessage(String messageBody); + + /** + * Method to de-serialize DropConstraintMessage instance. + */ + public abstract DropConstraintMessage getDropConstraintMessage(String messageBody); + // Protection against construction. protected MessageDeserializer() {} } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index c632ca4..e37a091 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -25,10 +25,13 @@ import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.util.ReflectionUtils; import java.util.Iterator; +import java.util.List; import java.util.Map; /** @@ -51,7 +54,9 @@ public static final String CREATE_INDEX_EVENT = "CREATE_INDEX"; public static final String DROP_INDEX_EVENT = "DROP_INDEX"; public static final String ALTER_INDEX_EVENT = "ALTER_INDEX"; - + public static final String ADD_PRIMARYKEY_EVENT = "ADD_PRIMARYKEY"; + public static final String ADD_FOREIGNKEY_EVENT = "ADD_FOREIGNKEY"; + public static final String DROP_CONSTRAINT_EVENT = "DROP_CONSTRAINT"; private static MessageFactory instance = null; @@ -241,4 +246,30 @@ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Pa */ public abstract InsertMessage buildInsertMessage(String db, String table, Map partVals, Iterator files); + + /*** + * Factory method for building add primary key message + * + * @param pks list of primary keys + * @return instance of AddPrimaryKeyMessage + */ + public abstract AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List pks); + + /*** + * Factory method for building add foreign key message + * + * @param fks list of foreign keys + * @return instance of AddForeignKeyMessage + */ + public abstract AddForeignKeyMessage buildAddForeignKeyMessage(List fks); + + /*** + * Factory method for building drop constraint message + * @param dbName + * @param tableName + * @param constraintName + * @return + */ + public abstract DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName, + String constraintName); } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java new file mode 100644 index 0000000..9c04c15 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddForeignKeyMessage.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.json; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of AddForeignKeyMessage + */ +public class JSONAddForeignKeyMessage extends AddForeignKeyMessage { + + @JsonProperty + String server, servicePrincipal; + + @JsonProperty + Long timestamp; + + @JsonProperty + List foreignKeyListJson; + + /** + * Default constructor, needed for Jackson. + */ + public JSONAddForeignKeyMessage() { + } + + public JSONAddForeignKeyMessage(String server, String servicePrincipal, List fks, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.timestamp = timestamp; + this.foreignKeyListJson = new ArrayList(); + try { + for (SQLForeignKey pk : fks) { + foreignKeyListJson.add(JSONMessageFactory.createForeignKeyObjJson(pk)); + } + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return null; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public List getForeignKeys() throws Exception { + List fks = new ArrayList(); + for (String pkJson : foreignKeyListJson) { + fks.add((SQLForeignKey)JSONMessageFactory.getTObj(pkJson, SQLForeignKey.class)); + } + return fks; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java new file mode 100644 index 0000000..2551cbf --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAddPrimaryKeyMessage.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.json; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of AddPrimaryKeyMessage + */ +public class JSONAddPrimaryKeyMessage extends AddPrimaryKeyMessage { + + @JsonProperty + String server, servicePrincipal; + + @JsonProperty + Long timestamp; + + @JsonProperty + List primaryKeyListJson; + + /** + * Default constructor, needed for Jackson. + */ + public JSONAddPrimaryKeyMessage() { + } + + public JSONAddPrimaryKeyMessage(String server, String servicePrincipal, List pks, + Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.timestamp = timestamp; + this.primaryKeyListJson = new ArrayList(); + try { + for (SQLPrimaryKey pk : pks) { + primaryKeyListJson.add(JSONMessageFactory.createPrimaryKeyObjJson(pk)); + } + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize: ", e); + } + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return null; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public List getPrimaryKeys() throws Exception { + List pks = new ArrayList(); + for (String pkJson : primaryKeyListJson) { + pks.add((SQLPrimaryKey)JSONMessageFactory.getTObj(pkJson, SQLPrimaryKey.class)); + } + return pks; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropConstraintMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropConstraintMessage.java new file mode 100644 index 0000000..4d3422b --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONDropConstraintMessage.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * JSON implementation of DropConstraintMessage + */ +public class JSONDropConstraintMessage extends DropConstraintMessage { + + @JsonProperty + String server, servicePrincipal, dbName, tableName, constraintName; + + @JsonProperty + Long timestamp; + + /** + * Default constructor, needed for Jackson. + */ + public JSONDropConstraintMessage() { + } + + public JSONDropConstraintMessage(String server, String servicePrincipal, String dbName, + String tableName, String constraintName, Long timestamp) { + this.server = server; + this.servicePrincipal = servicePrincipal; + this.timestamp = timestamp; + this.dbName = dbName; + this.tableName = tableName; + this.constraintName = constraintName; + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getDB() { + return dbName; + } + + @Override + public String getTable() { + return tableName; + } + + @Override + public String getConstraint() { + return constraintName; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java index 41732c7..ce2dc23 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.metastore.messaging.json; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; @@ -27,6 +29,7 @@ import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; import org.apache.hadoop.hive.metastore.messaging.CreateIndexMessage; import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; import org.apache.hadoop.hive.metastore.messaging.DropIndexMessage; @@ -186,4 +189,31 @@ public InsertMessage getInsertMessage(String messageBody) { throw new IllegalArgumentException("Could not construct InsertMessage", e); } } + + @Override + public AddPrimaryKeyMessage getAddPrimaryKeyMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAddPrimaryKeyMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct AddPrimaryKeyMessage", e); + } + } + + @Override + public AddForeignKeyMessage getAddForeignKeyMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAddForeignKeyMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct AddForeignKeyMessage", e); + } + } + + @Override + public DropConstraintMessage getDropConstraintMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONDropConstraintMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct DropConstraintMessage", e); + } + } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index a6ae8de..2431ec0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -33,8 +33,12 @@ import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; @@ -42,6 +46,7 @@ import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; import org.apache.hadoop.hive.metastore.messaging.CreateIndexMessage; import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; import org.apache.hadoop.hive.metastore.messaging.DropIndexMessage; @@ -172,6 +177,23 @@ public InsertMessage buildInsertMessage(String db, String table, Map pks) { + return new JSONAddPrimaryKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, pks, now()); + } + + @Override + public AddForeignKeyMessage buildAddForeignKeyMessage(List fks) { + return new JSONAddForeignKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fks, now()); + } + + @Override + public DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName, + String constraintName) { + return new JSONDropConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, dbName, tableName, + constraintName, now()); + } + private long now() { return System.currentTimeMillis() / 1000; } @@ -194,6 +216,16 @@ private long now() { })); } + static String createPrimaryKeyObjJson(SQLPrimaryKey primaryKeyObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(primaryKeyObj, "UTF-8"); + } + + static String createForeignKeyObjJson(SQLForeignKey foreignKeyObj) throws TException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + return serializer.toString(foreignKeyObj, "UTF-8"); + } + static String createTableObjJson(Table tableObj) throws TException { TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); return serializer.toString(tableObj, "UTF-8"); diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index f64b08d..73bd828 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -846,10 +846,10 @@ public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { } @Override - public void createTableWithConstraints(Table tbl, + public List createTableWithConstraints(Table tbl, List primaryKeys, List foreignKeys) throws InvalidObjectException, MetaException { - // TODO Auto-generated method stub + return null; } @Override @@ -859,14 +859,14 @@ public void dropConstraint(String dbName, String tableName, } @Override - public void addPrimaryKeys(List pks) + public List addPrimaryKeys(List pks) throws InvalidObjectException, MetaException { - // TODO Auto-generated method stub + return null; } @Override - public void addForeignKeys(List fks) + public List addForeignKeys(List fks) throws InvalidObjectException, MetaException { - // TODO Auto-generated method stub + return null; } } diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 2682886..1c1b558 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -862,10 +862,11 @@ public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { } @Override - public void createTableWithConstraints(Table tbl, + public List createTableWithConstraints(Table tbl, List primaryKeys, List foreignKeys) throws InvalidObjectException, MetaException { // TODO Auto-generated method stub + return null; } @Override @@ -875,15 +876,17 @@ public void dropConstraint(String dbName, String tableName, } @Override - public void addPrimaryKeys(List pks) + public List addPrimaryKeys(List pks) throws InvalidObjectException, MetaException { // TODO Auto-generated method stub + return null; } @Override - public void addForeignKeys(List fks) + public List addForeignKeys(List fks) throws InvalidObjectException, MetaException { // TODO Auto-generated method stub + return null; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 2b327db..63669fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -31,10 +31,15 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventUtils; @@ -116,6 +121,9 @@ EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), EVENT_INSERT("EVENT_INSERT"), + EVENT_ADD_PRIMARYKEY("EVENT_ADD_PRIMARYKEY"), + EVENT_ADD_FOREIGNKEY("EVENT_ADD_FOREIGNKEY"), + EVENT_DROP_CONSTRAINT("EVENT_DROP_CONSTRAINT"), EVENT_UNKNOWN("EVENT_UNKNOWN"); String type = null; @@ -646,6 +654,27 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition dmd.write(); break; } + case MessageFactory.ADD_PRIMARYKEY_EVENT: { + LOG.info("Processing#{} ADD_PRIMARYKEY_EVENT message : {}", ev.getEventId(), ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PRIMARYKEY, evid, evid, cmRoot); + dmd.setPayload(ev.getMessage()); + dmd.write(); + break; + } + case MessageFactory.ADD_FOREIGNKEY_EVENT: { + LOG.info("Processing#{} ADD_FOREIGNKEY_EVENT message : {}", ev.getEventId(), ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_FOREIGNKEY, evid, evid, cmRoot); + dmd.setPayload(ev.getMessage()); + dmd.write(); + break; + } + case MessageFactory.DROP_CONSTRAINT_EVENT: { + LOG.info("Processing#{} DROP_CONSTRAINT_EVENT message : {}", ev.getEventId(), ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_CONSTRAINT, evid, evid, cmRoot); + dmd.setPayload(ev.getMessage()); + dmd.write(); + break; + } // TODO : handle other event types default: LOG.info("Dummy processing#{} message : {}", ev.getEventId(), ev.getMessage()); @@ -1157,6 +1186,70 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { return analyzeTableLoad( insertMessage.getDB(), insertMessage.getTable(), locn, precursor, dbsUpdated, tablesUpdated); } + case EVENT_ADD_PRIMARYKEY: { + AddPrimaryKeyMessage addPrimaryKeyMessage = md.getAddPrimaryKeyMessage(dmd.getPayload()); + List pks = null; + try { + pks = addPrimaryKeyMessage.getPrimaryKeys(); + } catch (Exception e) { + if (!(e instanceof SemanticException)){ + throw new SemanticException("Error reading message members", e); + } else { + throw (SemanticException)e; + } + } + for (SQLPrimaryKey pk : pks) { + pk.setTable_db(dbName); + } + AlterTableDesc addConstraintsDesc = new AlterTableDesc(tblName, pks, new ArrayList()); + Task addConstraintsTask = TaskFactory.get(new DDLWork(inputs, outputs, addConstraintsDesc), conf); + if (precursor != null){ + precursor.addDependentTask(addConstraintsTask); + } + List> tasks = new ArrayList>(); + tasks.add(addConstraintsTask); + LOG.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), tblName); + return tasks; + } + case EVENT_ADD_FOREIGNKEY: { + AddForeignKeyMessage addForeignKeyMessage = md.getAddForeignKeyMessage(dmd.getPayload()); + List fks = null; + try { + fks = addForeignKeyMessage.getForeignKeys(); + } catch (Exception e) { + if (!(e instanceof SemanticException)){ + throw new SemanticException("Error reading message members", e); + } else { + throw (SemanticException)e; + } + } + for (SQLForeignKey fk : fks) { + fk.setPktable_db(dbName); + fk.setFktable_db(dbName); + } + AlterTableDesc addConstraintsDesc = new AlterTableDesc(tblName, new ArrayList(), fks); + Task addConstraintsTask = TaskFactory.get(new DDLWork(inputs, outputs, addConstraintsDesc), conf); + if (precursor != null){ + precursor.addDependentTask(addConstraintsTask); + } + List> tasks = new ArrayList>(); + tasks.add(addConstraintsTask); + LOG.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), tblName); + return tasks; + } + case EVENT_DROP_CONSTRAINT: { + DropConstraintMessage dropConstraintMessage = md.getDropConstraintMessage(dmd.getPayload()); + String constraintName = dropConstraintMessage.getConstraint(); + AlterTableDesc dropConstraintsDesc = new AlterTableDesc(dbName + "." + dropConstraintMessage.getTable(), constraintName); + Task dropConstraintsTask = TaskFactory.get(new DDLWork(inputs, outputs, dropConstraintsDesc), conf); + if (precursor != null){ + precursor.addDependentTask(dropConstraintsTask); + } + List> tasks = new ArrayList>(); + tasks.add(dropConstraintsTask); + LOG.debug("Added drop constrains task : {}:{}", dropConstraintsTask.getId(), constraintName); + return tasks; + } case EVENT_UNKNOWN: { break; }