diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 5a187f4..ec41537 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -163,26 +163,54 @@ private synchronized void advanceDumpDir() { } static class Tuple { - final String replicatedDbName; - final String lastReplicationId; + final String dumpLocation; + final String lastReplId; - Tuple(String replicatedDbName, String lastReplicationId) { - this.replicatedDbName = replicatedDbName; - this.lastReplicationId = lastReplicationId; + Tuple(String dumpLocation, String lastReplId) { + this.dumpLocation = dumpLocation; + this.lastReplId = lastReplId; } } - private Tuple loadAndVerify(String dbName) throws IOException { + private Tuple bootstrapLoadAndVerify(String dbName, String replDbName) throws IOException { + return incrementalLoadAndVerify(dbName, null, replDbName); + } + + private Tuple incrementalLoadAndVerify(String dbName, String fromReplId, String replDbName) throws IOException { + Tuple dump = replDumpDb(dbName, fromReplId, null, null); + loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId); + return dump; + } + + private Tuple dumpDbFromLastDump(String dbName, Tuple lastDump) throws IOException { + return replDumpDb(dbName, lastDump.lastReplId, null, null); + } + + private Tuple replDumpDb(String dbName, String fromReplID, String toReplID, String limit) throws IOException { advanceDumpDir(); - run("REPL DUMP " + dbName); + String dumpCmd = "REPL DUMP " + dbName; + if (null != fromReplID) { + dumpCmd = dumpCmd + " FROM " + fromReplID; + } + if (null != toReplID) { + dumpCmd = dumpCmd + " TO " + toReplID; + } + if (null != limit) { + dumpCmd = dumpCmd + " LIMIT " + limit; + } + run(dumpCmd); String dumpLocation = getResult(0, 0); - String lastReplicationId = getResult(0, 1, true); - String replicatedDbName = dbName + "_replicated"; - run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + String lastReplId = getResult(0, 1, true); + LOG.info("Dumped to {} with id {} for command: {}", dumpLocation, lastReplId, dumpCmd); + return new Tuple(dumpLocation, lastReplId); + } + + private void loadAndVerify(String replDbName, String dumpLocation, String lastReplId) throws IOException { + run("EXPLAIN REPL LOAD " + replDbName + " FROM '" + dumpLocation + "'"); printOutput(); - run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); - verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId); - return new Tuple(replicatedDbName, lastReplicationId); + run("REPL LOAD " + replDbName + " FROM '" + dumpLocation + "'"); + verifyRun("REPL STATUS " + replDbName, lastReplId); + return; } /** @@ -222,7 +250,8 @@ public void testBasic() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); - String replicatedDbName = loadAndVerify(dbName).replicatedDbName; + String replicatedDbName = dbName + "_dupe"; + bootstrapLoadAndVerify(dbName, replicatedDbName); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1); @@ -2069,6 +2098,194 @@ public void testTruncateWithCM() throws IOException { } @Test + public void testIncrementalRepeatEventOnExistingObject() throws IOException { + String testName = "incrementalRepeatEventOnExistingObject"; + String dbName = createDB(testName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + + // Bootstrap dump/load + String replDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + + // List to maintain the incremental dumps for each operation + List incrementalDumpList = new ArrayList(); + + String[] empty = new String[] {}; + String[] unptn_data = new String[] { "ten" }; + String[] ptn_data_1 = new String[] { "fifteen" }; + String[] ptn_data_2 = new String[] { "seventeen" }; + + // INSERT EVENT to unpartitioned table + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + Tuple replDump = dumpDbFromLastDump(dbName, bootstrapDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table with dynamic ADD_PARTITION + run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // ADD_PARTITION EVENT to partitioned table + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table on existing partition + run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=2) values('" + ptn_data_2[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // TRUNCATE_PARTITION EVENT on partitioned table + run("TRUNCATE TABLE " + dbName + ".ptned PARTITION (b=1)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // TRUNCATE_TABLE EVENT on unpartitioned table + run("TRUNCATE TABLE " + dbName + ".unptned"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // CREATE_TABLE EVENT with multiple partitions + run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".ptned"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // Replicate all the events happened so far + Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2); + + // Load each incremental dump from the list. Each dump have only one operation. + for (Tuple currDump : incrementalDumpList) { + // Load the incremental dump and ensure it does nothing and lastReplID remains same + loadAndVerify(replDbName, currDump.dumpLocation, incrDump.lastReplId); + + // Verify if the data are intact even after applying an applied event once again on existing objects + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2); + } + } + + @Test + public void testIncrementalRepeatEventOnMissingObject() throws IOException { + String testName = "incrementalRepeatEventOnMissingObject"; + String dbName = createDB(testName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + + // Bootstrap dump/load + String replDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + + // List to maintain the incremental dumps for each operation + List incrementalDumpList = new ArrayList(); + + String[] empty = new String[] {}; + String[] unptn_data = new String[] { "ten" }; + String[] ptn_data_1 = new String[] { "fifteen" }; + String[] ptn_data_2 = new String[] { "seventeen" }; + + // INSERT EVENT to unpartitioned table + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + Tuple replDump = dumpDbFromLastDump(dbName, bootstrapDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table with dynamic ADD_PARTITION + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // ADD_PARTITION EVENT to partitioned table + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table on existing partition + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // TRUNCATE_PARTITION EVENT on partitioned table + run("TRUNCATE TABLE " + dbName + ".ptned PARTITION(b=1)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // TRUNCATE_TABLE EVENT on unpartitioned table + run("TRUNCATE TABLE " + dbName + ".unptned"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // CREATE_TABLE EVENT on partitioned table + run("CREATE TABLE " + dbName + ".ptned_tmp (a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table with dynamic ADD_PARTITION + run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=10) values('" + ptn_data_1[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table with dynamic ADD_PARTITION + run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=20) values('" + ptn_data_2[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // DROP_PARTITION EVENT to partitioned table + run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b=1)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // RENAME_PARTITION EVENT to partitioned table + run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=20)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // RENAME_TABLE EVENT to unpartitioned table + run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_new"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // DROP_TABLE EVENT to partitioned table + run("DROP TABLE " + dbName + ".ptned_tmp"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // Replicate all the events happened so far + Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + + verifyIfTableNotExist(replDbName, "unptned"); + verifyIfTableNotExist(replDbName, "ptned_tmp"); + verifyIfTableExist(replDbName, "unptned_new"); + verifyIfTableExist(replDbName, "ptned"); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1"))); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2"))); + verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20"))); + + // Load each incremental dump from the list. Each dump have only one operation. + for (Tuple currDump : incrementalDumpList) { + // Load the current incremental dump and ensure it does nothing and lastReplID remains same + loadAndVerify(replDbName, currDump.dumpLocation, incrDump.lastReplId); + + // Verify if the data are intact even after applying an applied event once again on missing objects + verifyIfTableNotExist(replDbName, "unptned"); + verifyIfTableNotExist(replDbName, "ptned_tmp"); + verifyIfTableExist(replDbName, "unptned_new"); + verifyIfTableExist(replDbName, "ptned"); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1"))); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2"))); + verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20"))); + } + } + + @Test public void testStatus() throws IOException { // first test ReplStateMap functionality Map cmap = new ReplStateMap(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 41e834d..d2eb8c4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -102,6 +102,13 @@ public void testCreateFunctionIncrementalReplication() throws Throwable { .verify(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") .verify(replicatedDbName + ".testFunction"); + + // Test the idempotent behavior of CREATE FUNCTION + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verify(incrementalDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") + .verify(replicatedDbName + ".testFunction"); } @Test @@ -123,6 +130,13 @@ public void testDropFunctionIncrementalReplication() throws Throwable { .verify(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '*testfunction*'") .verify(null); + + // Test the idempotent behavior of DROP FUNCTION + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verify(incrementalDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '*testfunction*'") + .verify(null); } @Test diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index a35f7b2..5659860 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -98,11 +98,8 @@ private void initialize(String cmRoot) throws Exception { System.setProperty("datanucleus.mapping.Schema", schemaName); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:memory:${test.tmp.dir}/" + schemaName + ";create=true"); - - int metaStorePort = MetaStoreUtils.startMetaStore(hiveConf); hiveConf.setVar(HiveConf.ConfVars.REPLDIR, - hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/"); - hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort); + hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/"); hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); @@ -110,6 +107,9 @@ private void initialize(String cmRoot) throws Exception { System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + int metaStorePort = MetaStoreUtils.startMetaStore(hiveConf); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort); + Path testPath = new Path(hiveWarehouseLocation); FileSystem testPathFileSystem = FileSystem.get(testPath.toUri(), hiveConf); testPathFileSystem.mkdirs(testPath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 87928ee..b6ba48b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -934,6 +934,12 @@ private int alterDatabase(Hive db, AlterDatabaseDesc alterDbDesc) throws HiveExc case ALTER_PROPERTY: Map newParams = alterDbDesc.getDatabaseProperties(); Map params = database.getParameters(); + + if (!alterDbDesc.getReplicationSpec().allowEventReplacementInto(params)) { + LOG.debug("DDLTask: Alter Database {} is skipped as database is newer than update", dbName); + return 0; // no replacement, the existing database state is newer than our update. + } + // if both old and new params are not null, merge them if (params != null && newParams != null) { params.putAll(newParams); @@ -1117,10 +1123,19 @@ private int addPartitions(Hive db, AddPartitionDesc addPartitionDesc) throws Hiv * @throws HiveException */ private int renamePartition(Hive db, RenamePartitionDesc renamePartitionDesc) throws HiveException { + String tableName = renamePartitionDesc.getTableName(); + LinkedHashMap oldPartSpec = renamePartitionDesc.getOldPartSpec(); - Table tbl = db.getTable(renamePartitionDesc.getTableName()); + if (!allowOperationInReplicationScope(db, tableName, oldPartSpec, renamePartitionDesc.getReplicationSpec())) { + // no rename, the table is missing either due to drop/rename which follows the current rename. + // or the existing table is newer than our update. + LOG.debug("DDLTask: Rename Partition is skipped as table {} / partition {} is newer than update", + tableName, + FileUtils.makePartName(new ArrayList(oldPartSpec.keySet()), new ArrayList(oldPartSpec.values()))); + return 0; + } - LinkedHashMap oldPartSpec = renamePartitionDesc.getOldPartSpec(); + Table tbl = db.getTable(tableName); Partition oldPart = db.getPartition(tbl, oldPartSpec, false); if (oldPart == null) { String partName = FileUtils.makePartName(new ArrayList(oldPartSpec.keySet()), @@ -1131,8 +1146,7 @@ private int renamePartition(Hive db, RenamePartitionDesc renamePartitionDesc) th Partition part = db.getPartition(tbl, oldPartSpec, false); part.setValues(renamePartitionDesc.getNewPartSpec()); db.renamePartition(tbl, oldPartSpec, part); - Partition newPart = db - .getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false); + Partition newPart = db.getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false); work.getInputs().add(new ReadEntity(oldPart)); // We've already obtained a lock on the table, don't lock the partition too addIfAbsentByName(new WriteEntity(newPart, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -3559,6 +3573,13 @@ static StringBuilder appendNonNull(StringBuilder builder, Object value, boolean * Throws this exception if an unexpected error occurs. */ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { + if (!allowOperationInReplicationScope(db, alterTbl.getOldName(), null, alterTbl.getReplicationSpec())) { + // no alter, the table is missing either due to drop/rename which follows the alter. + // or the existing table is newer than our update. + LOG.debug("DDLTask: Alter Table is skipped as table {} is newer than update", alterTbl.getOldName()); + return 0; + } + // alter the table Table tbl = db.getTable(alterTbl.getOldName()); @@ -4191,19 +4212,20 @@ private void dropTable(Hive db, Table tbl, DropTableDesc dropTbl) throws HiveExc * drop the partitions inside it that are older than this event. To wit, DROP TABLE FOR REPL * acts like a recursive DROP TABLE IF OLDER. */ - if (!replicationSpec.allowEventReplacementInto(tbl)){ + if (!replicationSpec.allowEventReplacementInto(tbl.getParameters())){ // Drop occured as part of replicating a drop, but the destination // table was newer than the event being replicated. Ignore, but drop // any partitions inside that are older. if (tbl.isPartitioned()){ - PartitionIterable partitions = new PartitionIterable(db,tbl,null,conf.getIntVar( - HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); + PartitionIterable partitions = new PartitionIterable(db,tbl,null, + conf.getIntVar(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); for (Partition p : Iterables.filter(partitions, replicationSpec.allowEventReplacementInto())){ db.dropPartition(tbl.getDbName(),tbl.getTableName(),p.getValues(),true); } } + LOG.debug("DDLTask: Drop Table is skipped as table {} is newer than update", dropTbl.getTableName()); return; // table is newer, leave it be. } } @@ -4370,10 +4392,12 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { // trigger replace-mode semantics. Table existingTable = db.getTable(tbl.getDbName(), tbl.getTableName(), false); if (existingTable != null){ - if (!crtTbl.getReplicationSpec().allowEventReplacementInto(existingTable)){ - return 0; // no replacement, the existing table state is newer than our update. - } else { + if (crtTbl.getReplicationSpec().allowEventReplacementInto(existingTable.getParameters())){ crtTbl.setReplaceMode(true); // we replace existing table. + } else { + LOG.debug("DDLTask: Create Table is skipped as table {} is newer than update", + crtTbl.getTableName()); + return 0; // no replacement, the existing table state is newer than our update. } } } @@ -4703,6 +4727,15 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H String tableName = truncateTableDesc.getTableName(); Map partSpec = truncateTableDesc.getPartSpec(); + if (!allowOperationInReplicationScope(db, tableName, partSpec, truncateTableDesc.getReplicationSpec())) { + // no truncate, the table is missing either due to drop/rename which follows the truncate. + // or the existing table is newer than our update. + LOG.debug("DDLTask: Truncate Table/Partition is skipped as table {} / partition {} is newer than update", + tableName, + (partSpec == null) ? "null" : FileUtils.makePartName(new ArrayList(partSpec.keySet()), new ArrayList(partSpec.values()))); + return 0; + } + try { db.truncateTable(tableName, partSpec); } catch (Exception e) { @@ -4829,6 +4862,45 @@ private void makeLocationQualified(Database database) throws HiveException { } } + /** + * Validate if the given table/partition is eligible for update + * + * @param db Database. + * @param tableName Table name of format db.table + * @param partSpec Partition spec for the partition + * @param replicationSpec Replications specification + * + * @return boolean true if allow the operation + * @throws HiveException + */ + private boolean allowOperationInReplicationScope(Hive db, String tableName, + Map partSpec, ReplicationSpec replicationSpec) throws HiveException { + if ((null == replicationSpec) || (!replicationSpec.isInReplicationScope())) { + // Always allow the operation if it is not in replication scope. + return true; + } + // If the table/partition exist and is older than the event, then just apply + // the event else noop. + Table existingTable = db.getTable(tableName, false); + if ((existingTable != null) + && replicationSpec.allowEventReplacementInto(existingTable.getParameters())) { + // Table exists and is older than the update. Now, need to ensure if update allowed on the + // partition. + if (partSpec != null) { + Partition existingPtn = db.getPartition(existingTable, partSpec, false); + return ((existingPtn != null) + && replicationSpec.allowEventReplacementInto(existingPtn.getParameters())); + } + + // Replacement is allowed as the existing table is older than event + return true; + } + + // The table is missing either due to drop/rename which follows the operation. + // Or the existing table is newer than our update. So, don't allow the update. + return false; + } + public static boolean doesTableNeedLocation(Table tbl) { // TODO: If we are ok with breaking compatibility of existing 3rd party StorageHandlers, // this method could be moved to the HiveStorageHandler interface. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java index 42cdc84..0f990e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -29,7 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; @@ -77,6 +77,19 @@ public int execute(DriverContext driverContext) { return createTemporaryFunction(createFunctionDesc); } else { try { + if (createFunctionDesc.getReplicationSpec().isInReplicationScope()) { + String[] qualifiedNameParts = FunctionUtils.getQualifiedFunctionNameParts( + createFunctionDesc.getFunctionName()); + String dbName = qualifiedNameParts[0]; + String funcName = qualifiedNameParts[1]; + Map dbProps = Hive.get().getDatabase(dbName).getParameters(); + if (!createFunctionDesc.getReplicationSpec().allowEventReplacementInto(dbProps)) { + // If the database is newer than the create event, then noop it. + LOG.debug("FunctionTask: Create Function {} is skipped as database {} " + + "is newer than update", funcName, dbName); + return 0; + } + } return createPermanentFunction(Hive.get(conf), createFunctionDesc); } catch (Exception e) { setException(e); @@ -92,6 +105,19 @@ public int execute(DriverContext driverContext) { return dropTemporaryFunction(dropFunctionDesc); } else { try { + if (dropFunctionDesc.getReplicationSpec().isInReplicationScope()) { + String[] qualifiedNameParts = FunctionUtils.getQualifiedFunctionNameParts( + dropFunctionDesc.getFunctionName()); + String dbName = qualifiedNameParts[0]; + String funcName = qualifiedNameParts[1]; + Map dbProps = Hive.get().getDatabase(dbName).getParameters(); + if (!dropFunctionDesc.getReplicationSpec().allowEventReplacementInto(dbProps)) { + // If the database is newer than the drop event, then noop it. + LOG.debug("FunctionTask: Drop Function {} is skipped as database {} " + + "is newer than update", funcName, dbName); + return 0; + } + } return dropPermanentFunction(Hive.get(conf), dropFunctionDesc); } catch (Exception e) { setException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 88c73f0..73710a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2192,7 +2192,7 @@ public Partition createPartition(Table tbl, Map partSpec) throws try { org.apache.hadoop.hive.metastore.api.Partition ptn = getMSC().getPartition(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), p.getValues()); - if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn)){ + if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn.getParameters())){ partsToAlter.add(p); } // else ptn already exists, but we do nothing with it. } catch (NoSuchObjectException nsoe){ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index e9a4ff0..fec8bf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -697,7 +697,7 @@ private void analyzeAlterDatabaseProperties(ASTNode ast) throws SemanticExceptio throw new SemanticException("Unrecognized token in CREATE DATABASE statement"); } } - AlterDatabaseDesc alterDesc = new AlterDatabaseDesc(dbName, dbProps); + AlterDatabaseDesc alterDesc = new AlterDatabaseDesc(dbName, dbProps, null); addAlterDbDesc(alterDesc); } @@ -946,7 +946,7 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { } } - TruncateTableDesc truncateTblDesc = new TruncateTableDesc(tableName, partSpec); + TruncateTableDesc truncateTblDesc = new TruncateTableDesc(tableName, partSpec, null); DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), truncateTblDesc); Task truncateTask = TaskFactory.get(ddlWork, conf); @@ -2622,7 +2622,7 @@ private void analyzeAlterTableRename(String[] source, ASTNode ast, boolean expec String sourceName = getDotName(source); String targetName = getDotName(target); - AlterTableDesc alterTblDesc = new AlterTableDesc(sourceName, targetName, expectView); + AlterTableDesc alterTblDesc = new AlterTableDesc(sourceName, targetName, expectView, null); addInputsOutputsAlterTable(sourceName, null, alterTblDesc); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf)); @@ -2741,7 +2741,7 @@ private void analyzeAlterTableRenamePart(ASTNode ast, String tblName, partSpecs.add(oldPartSpec); partSpecs.add(newPartSpec); addTablePartsOutputs(tab, partSpecs, WriteEntity.WriteType.DDL_EXCLUSIVE); - RenamePartitionDesc renamePartitionDesc = new RenamePartitionDesc(tblName, oldPartSpec, newPartSpec); + RenamePartitionDesc renamePartitionDesc = new RenamePartitionDesc(tblName, oldPartSpec, newPartSpec, null); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), renamePartitionDesc), conf)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java index a21b043..c538075 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java @@ -86,7 +86,7 @@ private void analyzeCreateFunction(ASTNode ast) throws SemanticException { } CreateFunctionDesc desc = - new CreateFunctionDesc(functionName, isTemporaryFunction, className, resources); + new CreateFunctionDesc(functionName, isTemporaryFunction, className, resources, null); rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf)); addEntities(functionName, isTemporaryFunction, resources); @@ -114,7 +114,7 @@ private void analyzeDropFunction(ASTNode ast) throws SemanticException { } boolean isTemporaryFunction = (ast.getFirstChildWithType(HiveParser.TOK_TEMPORARY) != null); - DropFunctionDesc desc = new DropFunctionDesc(functionName, isTemporaryFunction); + DropFunctionDesc desc = new DropFunctionDesc(functionName, isTemporaryFunction, null); rootTasks.add(TaskFactory.get(new FunctionWork(desc), conf)); addEntities(functionName, isTemporaryFunction, null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index dc86942..2d907ff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -792,21 +792,6 @@ private static void createReplImportTasks( Task dr = null; WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; - if ((table != null) && (isPartitioned(tblDesc) != table.isPartitioned())){ - // If destination table exists, but is partitioned, and we think we're writing to an unpartitioned - // or if destination table exists, but is unpartitioned and we think we're writing to a partitioned - // table, then this can only happen because there are drops in the queue that are yet to be processed. - // So, we check the repl.last.id of the destination, and if it's newer, we no-op. If it's older, we - // drop and re-create. - if (replicationSpec.allowReplacementInto(table)){ - dr = dropTableTask(table, x); - lockType = WriteEntity.WriteType.DDL_EXCLUSIVE; - table = null; // null it out so we go into the table re-create flow. - } else { - return; // noop out of here. - } - } - // Normally, on import, trying to create a table or a partition in a db that does not yet exist // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying // to create tasks to create a table inside a db that as-of-now does not exist, but there is @@ -818,6 +803,20 @@ private static void createReplImportTasks( throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName())); } } + + if (table != null) { + if (!replicationSpec.allowReplacementInto(table.getParameters())) { + // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + return; + } + } else { + // If table doesn't exist, allow creating a new one only if the database state is older than the update. + if ((parentDb != null) && (!replicationSpec.allowReplacementInto(parentDb.getParameters()))) { + // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + return; + } + } + if (tblDesc.getLocation() == null) { if (!waitOnPrecursor){ tblDesc.setLocation(wh.getDefaultTablePath(parentDb, tblDesc.getTableName()).toString()); @@ -832,16 +831,15 @@ private static void createReplImportTasks( } } - /* Note: In the following section, Metadata-only import handling logic is - interleaved with regular repl-import logic. The rule of thumb being - followed here is that MD-only imports are essentially ALTERs. They do - not load data, and should not be "creating" any metadata - they should - be replacing instead. The only place it makes sense for a MD-only import - to create is in the case of a table that's been dropped and recreated, - or in the case of an unpartitioned table. In all other cases, it should - behave like a noop or a pure MD alter. - */ - + /* Note: In the following section, Metadata-only import handling logic is + interleaved with regular repl-import logic. The rule of thumb being + followed here is that MD-only imports are essentially ALTERs. They do + not load data, and should not be "creating" any metadata - they should + be replacing instead. The only place it makes sense for a MD-only import + to create is in the case of a table that's been dropped and recreated, + or in the case of an unpartitioned table. In all other cases, it should + behave like a noop or a pure MD alter. + */ if (table == null) { // Either we're dropping and re-creating, or the table didn't exist, and we're creating. @@ -889,7 +887,7 @@ private static void createReplImportTasks( } else { // If replicating, then the partition already existing means we need to replace, maybe, if // the destination ptn's repl.last.id is older than the replacement's. - if (replicationSpec.allowReplacementInto(ptn)){ + if (replicationSpec.allowReplacementInto(ptn.getParameters())){ if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); @@ -915,7 +913,7 @@ private static void createReplImportTasks( } } else { x.getLOG().debug("table non-partitioned"); - if (!replicationSpec.allowReplacementInto(table)){ + if (!replicationSpec.allowReplacementInto(table.getParameters())){ return; // silently return, table is newer than our replacement. } if (!replicationSpec.isMetadataOnly()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 961561d..5787d70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -668,11 +668,11 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { for (String tableName : tablesUpdated.keySet()){ // weird - AlterTableDesc requires a HashMap to update props instead of a Map. HashMap mapProp = new HashMap(); - mapProp.put( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), - tablesUpdated.get(tableName).toString()); + String eventId = tablesUpdated.get(tableName).toString(); + + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); AlterTableDesc alterTblDesc = new AlterTableDesc( - AlterTableDesc.AlterTableTypes.ADDPROPS, null, false); + AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(eventId, eventId)); alterTblDesc.setProps(mapProp); alterTblDesc.setOldName(tableName); Task updateReplIdTask = TaskFactory.get( @@ -682,10 +682,10 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } for (String dbName : dbsUpdated.keySet()){ Map mapProp = new HashMap(); - mapProp.put( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), - dbsUpdated.get(dbName).toString()); - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp); + String eventId = dbsUpdated.get(dbName).toString(); + + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(eventId, eventId)); Task updateReplIdTask = TaskFactory.get( new DDLWork(inputs, outputs, alterDbDesc), conf); taskChainTail.addDependentTask(updateReplIdTask); @@ -786,7 +786,7 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) Task dbRootTask = null; if (existEmptyDb(dbName)) { - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, dbObj.getParameters()); + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, dbObj.getParameters(), null); dbRootTask = TaskFactory.get(new DDLWork(inputs, outputs, alterDbDesc), conf); } else { CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 1ea608b..4badea6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -20,7 +20,6 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.PlanUtils; import javax.annotation.Nullable; @@ -134,6 +133,10 @@ public ReplicationSpec(){ this((ASTNode)null); } + public ReplicationSpec(String fromId, String toId) { + this(true, false, fromId, toId, false, true, false); + } + public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy, boolean isReplace) { @@ -189,58 +192,28 @@ public static boolean allowReplacement(String currReplState, String replacementR } // First try to extract a long value from the strings, and compare them. - // If oldReplState is less-than or equal to newReplState, allow. + // If oldReplState is less-than newReplState, allow. long currReplStateLong = Long.parseLong(currReplState.replaceAll("\\D","")); long replacementReplStateLong = Long.parseLong(replacementReplState.replaceAll("\\D","")); - if ((currReplStateLong != 0) || (replacementReplStateLong != 0)){ - return ((currReplStateLong - replacementReplStateLong) <= 0); - } - - // If the long value of both is 0, though, fall back to lexical comparison. - - // Lexical comparison according to locale will suffice for now, future might add more logic - return (collator.compare(currReplState.toLowerCase(), replacementReplState.toLowerCase()) <= 0); + return ((currReplStateLong - replacementReplStateLong) < 0); } /** - * Determines if a current replication object(current state of dump) is allowed to - * replicate-replace-into a given partition - */ - public boolean allowReplacementInto(Partition ptn){ - return allowReplacement(getLastReplicatedStateFromParameters(ptn.getParameters()),this.getCurrentReplicationState()); - } - - /** - * Determines if a current replication object(current state of dump) is allowed to - * replicate-replace-into a given partition + * Determines if a current replication object (current state of dump) is allowed to + * replicate-replace-into a given metastore object (based on state_id stored in their parameters) */ - public boolean allowReplacementInto(org.apache.hadoop.hive.metastore.api.Partition ptn){ - return allowReplacement(getLastReplicatedStateFromParameters(ptn.getParameters()),this.getCurrentReplicationState()); + public boolean allowReplacementInto(Map params){ + return allowReplacement(getLastReplicatedStateFromParameters(params), + getCurrentReplicationState()); } /** - * Determines if a current replication event specification is allowed to - * replicate-replace-into a given partition + * Determines if a current replication event (based on event id) is allowed to + * replicate-replace-into a given metastore object (based on state_id stored in their parameters) */ - public boolean allowEventReplacementInto(Partition ptn){ - return allowReplacement(getLastReplicatedStateFromParameters(ptn.getParameters()),this.getReplicationState()); - } - - /** - * Determines if a current replication object(current state of dump) is allowed to - * replicate-replace-into a given table - */ - public boolean allowReplacementInto(Table table) { - return allowReplacement(getLastReplicatedStateFromParameters(table.getParameters()),this.getCurrentReplicationState()); - } - - /** - * Determines if a current replication event specification is allowed to - * replicate-replace-into a given table - */ - public boolean allowEventReplacementInto(Table table) { - return allowReplacement(getLastReplicatedStateFromParameters(table.getParameters()),this.getReplicationState()); + public boolean allowEventReplacementInto(Map params){ + return allowReplacement(getLastReplicatedStateFromParameters(params), getReplicationState()); } /** @@ -254,7 +227,7 @@ public boolean apply(@Nullable Partition partition) { if (partition == null){ return false; } - return (allowEventReplacementInto(partition)); + return (allowEventReplacementInto(partition.getParameters())); } }; } @@ -350,7 +323,6 @@ public void setLazy(boolean isLazy){ this.isLazy = isLazy; } - public String get(KEY key) { switch (key){ case REPL_SCOPE: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java index 95e51e4..d6a95bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java @@ -56,12 +56,4 @@ return databasesUpdated; } - ReplicationSpec eventOnlyReplicationSpec(Context forContext) throws SemanticException { - String eventId = forContext.dmd.getEventTo().toString(); - return replicationSpec(eventId, eventId); - } - - private ReplicationSpec replicationSpec(String fromId, String toId) throws SemanticException { - return new ReplicationSpec(true, false, fromId, toId, false, true, false); - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index 452f506..a6d35cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; @@ -112,7 +113,7 @@ private FunctionDescBuilder(Context context) throws SemanticException { destinationDbName = context.isDbNameEmpty() ? metadata.function.getDbName() : context.dbName; } - private CreateFunctionDesc build() { + private CreateFunctionDesc build() throws SemanticException { replCopyTasks.clear(); PrimaryToReplicaResourceFunction conversionFunction = new PrimaryToReplicaResourceFunction(context, metadata, destinationDbName); @@ -127,8 +128,12 @@ private CreateFunctionDesc build() { String fullQualifiedFunctionName = FunctionUtils.qualifyFunctionName( metadata.function.getFunctionName(), destinationDbName ); + // For bootstrap load, the create function should be always performed. + // Only for incremental load, need to validate if event is newer than the database. + ReplicationSpec replSpec = (context.dmd == null) ? null : context.eventOnlyReplicationSpec(); return new CreateFunctionDesc( - fullQualifiedFunctionName, false, metadata.function.getClassName(), transformedUris + fullQualifiedFunctionName, false, metadata.function.getClassName(), + transformedUris, replSpec ); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java index daf7b2a..dae300f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java @@ -37,7 +37,8 @@ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; String qualifiedFunctionName = FunctionUtils.qualifyFunctionName(msg.getFunctionName(), actualDbName); - DropFunctionDesc desc = new DropFunctionDesc(qualifiedFunctionName, false); + DropFunctionDesc desc = new DropFunctionDesc( + qualifiedFunctionName, false, context.eventOnlyReplicationSpec()); Task dropFunctionTask = TaskFactory.get(new FunctionWork(desc), context.hiveConf); context.log.debug( "Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName() diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 131d672..771400e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -51,7 +51,7 @@ msg.getPartitions()); if (partSpecs.size() > 0) { DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + actualTblName, - partSpecs, null, true, eventOnlyReplicationSpec(context)); + partSpecs, null, true, context.eventOnlyReplicationSpec()); Task dropPtnTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, dropPtnDesc), context.hiveConf diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index e6e06c3..3ee3949 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -37,8 +37,7 @@ String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; DropTableDesc dropTableDesc = new DropTableDesc( actualDbName + "." + actualTblName, - null, true, true, - eventOnlyReplicationSpec(context) + null, true, true, context.eventOnlyReplicationSpec() ); Task dropTableTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, dropTableDesc), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java index 840f95e..33c716f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; @@ -87,5 +88,10 @@ boolean isTableNameEmpty() { boolean isDbNameEmpty() { return StringUtils.isEmpty(dbName); } + + ReplicationSpec eventOnlyReplicationSpec() throws SemanticException { + String eventId = dmd.getEventTo().toString(); + return new ReplicationSpec(eventId, eventId); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 627fb46..5bd0532 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -59,14 +59,12 @@ : new SemanticException("Error reading message members", e); } - RenamePartitionDesc renamePtnDesc = - new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec); + RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc( + tableName, oldPartSpec, newPartSpec, context.eventOnlyReplicationSpec()); Task renamePtnTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf - ); - context.log - .debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, - newPartSpec); + new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf); + context.log.debug("Added rename ptn task : {}:{}->{}", + renamePtnTask.getId(), oldPartSpec, newPartSpec); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); tablesUpdated.put(tableName, context.dmd.getEventTo()); return Collections.singletonList(renamePtnTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java index 10f0753..4785e55 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -57,20 +57,21 @@ String oldName = oldDbName + "." + msg.getTableObjBefore().getTableName(); String newName = newDbName + "." + msg.getTableObjAfter().getTableName(); - AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false); + AlterTableDesc renameTableDesc = new AlterTableDesc( + oldName, newName, false, context.eventOnlyReplicationSpec()); Task renameTableTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf - ); - context.log.debug( - "Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName - ); + new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf); + context.log.debug("Added rename table task : {}:{}->{}", + renameTableTask.getId(), oldName, newName); + // oldDbName and newDbName *will* be the same if we're here databasesUpdated.put(newDbName, context.dmd.getEventTo()); tablesUpdated.remove(oldName); tablesUpdated.put(newName, context.dmd.getEventTo()); - // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated - // However, we explicitly don't support repl of that sort, and error out above if so. If that should - // ever change, this will need reworking. + + // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out + // tablesUpdated. However, we explicitly don't support repl of that sort, and error out above + // if so. If that should ever change, this will need reworking. return Collections.singletonList(renameTableTask); } catch (Exception e) { throw (e instanceof SemanticException) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 09d70eb..65e1d6a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -24,7 +24,6 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; public class TableHandler extends AbstractMessageHandler { @@ -37,27 +36,18 @@ throw new SemanticException("Database name cannot be null for a table load"); } try { - // TODO: why not have the below variables as static / inline seems to have no possibility of updates back here - - // no location set on repl loads - boolean isLocationSet = false; - // all repl imports are non-external - boolean isExternalSet = false; - // bootstrap loads are not partition level - boolean isPartSpecSet = false; - // repl loads are not partition level - LinkedHashMap parsedPartSpec = null; - // no location for repl imports - String parsedLocation = null; List> importTasks = new ArrayList<>(); EximUtil.SemanticAnalyzerWrapperContext x = new EximUtil.SemanticAnalyzerWrapperContext( context.hiveConf, context.db, readEntitySet, writeEntitySet, importTasks, context.log, context.nestedContext); - ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, - (context.precursor != null), parsedLocation, context.tableName, context.dbName, - parsedPartSpec, context.location, x, + + // REPL LOAD is not partition level. It is always DB or table level. So, passing null for partition specs. + // Also, REPL LOAD doesn't support external table and hence no location set as well. + ImportSemanticAnalyzer.prepareImport(false, false, false, + (context.precursor != null), null, context.tableName, context.dbName, + null, context.location, x, databasesUpdated, tablesUpdated); return importTasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index fe45788..3a8990a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -55,15 +55,15 @@ } TruncateTableDesc truncateTableDesc = new TruncateTableDesc( - actualDbName + "." + actualTblName, partSpec); - Task truncatePtnTask = - TaskFactory.get( + actualDbName + "." + actualTblName, partSpec, + context.eventOnlyReplicationSpec()); + Task truncatePtnTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), - context.hiveConf - ); + context.hiveConf); context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName()); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); return Collections.singletonList(truncatePtnTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index fc024f1..93ffa29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -36,15 +36,16 @@ String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; TruncateTableDesc truncateTableDesc = new TruncateTableDesc( - actualDbName + "." + actualTblName, null); + actualDbName + "." + actualTblName, + null, context.eventOnlyReplicationSpec()); Task truncateTableTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), - context.hiveConf - ); + context.hiveConf); context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName()); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); return Collections.singletonList(truncateTableTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java index 5e218c4..368db0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Map; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain.Level; /** @@ -41,6 +42,7 @@ String databaseName; Map dbProperties; PrincipalDesc ownerPrincipal; + ReplicationSpec replicationSpec; /** * For serialization only. @@ -48,10 +50,11 @@ public AlterDatabaseDesc() { } - public AlterDatabaseDesc(String databaseName, Map dbProps) { + public AlterDatabaseDesc(String databaseName, Map dbProps, ReplicationSpec replicationSpec) { super(); this.databaseName = databaseName; this.dbProperties = dbProps; + this.replicationSpec = replicationSpec; this.setAlterType(ALTER_DB_TYPES.ALTER_PROPERTY); } @@ -95,4 +98,15 @@ public ALTER_DB_TYPES getAlterType() { public void setAlterType(ALTER_DB_TYPES alterType) { this.alterType = alterType; } + + /** + * @return what kind of replication scope this alter is running under. + * This can result in a "ALTER IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec() { + if (replicationSpec == null) { + this.replicationSpec = new ReplicationSpec(); + } + return this.replicationSpec; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java index 6cfde18..2691faa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -131,6 +132,7 @@ List foreignKeyCols; List uniqueConstraintCols; List notNullConstraintCols; + ReplicationSpec replicationSpec; public AlterTableDesc() { } @@ -188,12 +190,17 @@ public AlterTableDesc(String tblName, HashMap partSpec, * old name of the table * @param newName * new name of the table + * @param expectView + * Flag to denote if current table can be a view + * @param replicationSpec + * Replication specification with current event ID */ - public AlterTableDesc(String oldName, String newName, boolean expectView) { + public AlterTableDesc(String oldName, String newName, boolean expectView, ReplicationSpec replicationSpec) { op = AlterTableTypes.RENAME; this.oldName = oldName; this.newName = newName; this.expectView = expectView; + this.replicationSpec = replicationSpec; } /** @@ -214,6 +221,17 @@ public AlterTableDesc(String name, HashMap partSpec, List partSpec, boolean expectView) { op = alterType; @@ -858,4 +880,9 @@ public void setEnvironmentContext(EnvironmentContext environmentContext) { this.environmentContext = environmentContext; } + /** + * @return what kind of replication scope this alter is running under. + * This can result in a "ALTER IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec(){ return this.replicationSpec; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateFunctionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateFunctionDesc.java index 46b0fd6..8feeb78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateFunctionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateFunctionDesc.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hive.metastore.api.ResourceUri; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain.Level; /** @@ -36,6 +37,7 @@ private String className; private boolean isTemp; private List resources; + private ReplicationSpec replicationSpec; /** * For serialization only. @@ -44,11 +46,12 @@ public CreateFunctionDesc() { } public CreateFunctionDesc(String functionName, boolean isTemp, String className, - List resources) { + List resources, ReplicationSpec replicationSpec) { this.functionName = functionName; this.isTemp = isTemp; this.className = className; this.resources = resources; + this.replicationSpec = replicationSpec; } @Explain(displayName = "name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -85,4 +88,14 @@ public void setResources(List resources) { this.resources = resources; } + /** + * @return what kind of replication scope this create is running under. + * This can result in a "CREATE IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec() { + if (replicationSpec == null) { + this.replicationSpec = new ReplicationSpec(); + } + return this.replicationSpec; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DropFunctionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DropFunctionDesc.java index 54dd374..01a5560 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DropFunctionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DropFunctionDesc.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; + +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -32,6 +34,7 @@ private String functionName; private boolean isTemp; + private ReplicationSpec replicationSpec; /** * For serialization only. @@ -39,9 +42,10 @@ public DropFunctionDesc() { } - public DropFunctionDesc(String functionName, boolean isTemp) { + public DropFunctionDesc(String functionName, boolean isTemp, ReplicationSpec replicationSpec) { this.functionName = functionName; this.isTemp = isTemp; + this.replicationSpec = replicationSpec; } @Explain(displayName = "name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -61,4 +65,14 @@ public void setTemp(boolean isTemp) { this.isTemp = isTemp; } + /** + * @return what kind of replication scope this create is running under. + * This can result in a "DROP IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec() { + if (replicationSpec == null) { + this.replicationSpec = new ReplicationSpec(); + } + return this.replicationSpec; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java index 7523d01..ef85d13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; + import java.io.Serializable; import java.util.LinkedHashMap; import java.util.Map; @@ -28,10 +30,11 @@ private static final long serialVersionUID = 1L; - String tableName; - String location; - LinkedHashMap oldPartSpec; - LinkedHashMap newPartSpec; + private String tableName; + private String location; + private LinkedHashMap oldPartSpec; + private LinkedHashMap newPartSpec; + private ReplicationSpec replicationSpec; /** * For serialization only. @@ -40,8 +43,6 @@ public RenamePartitionDesc() { } /** - * @param dbName - * database to add to. * @param tableName * table to add to. * @param oldPartSpec @@ -50,10 +51,11 @@ public RenamePartitionDesc() { * new partition specification. */ public RenamePartitionDesc(String tableName, - Map oldPartSpec, Map newPartSpec) { + Map oldPartSpec, Map newPartSpec, ReplicationSpec replicationSpec) { this.tableName = tableName; this.oldPartSpec = new LinkedHashMap(oldPartSpec); this.newPartSpec = new LinkedHashMap(newPartSpec); + this.replicationSpec = replicationSpec; } /** @@ -115,4 +117,10 @@ public void setOldPartSpec(LinkedHashMap partSpec) { public void setNewPartSpec(LinkedHashMap partSpec) { this.newPartSpec = partSpec; } + + /** + * @return what kind of replication scope this rename is running under. + * This can result in a "RENAME IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec() { return this.replicationSpec; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java index 90c123d..f07fe04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -39,13 +40,15 @@ private Path inputDir; private Path outputDir; private ListBucketingCtx lbCtx; + private ReplicationSpec replicationSpec; public TruncateTableDesc() { } - public TruncateTableDesc(String tableName, Map partSpec) { + public TruncateTableDesc(String tableName, Map partSpec, ReplicationSpec replicationSpec) { this.tableName = tableName; this.partSpec = partSpec; + this.replicationSpec = replicationSpec; } @Explain(displayName = "TableName", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -98,4 +101,10 @@ public ListBucketingCtx getLbCtx() { public void setLbCtx(ListBucketingCtx lbCtx) { this.lbCtx = lbCtx; } + + /** + * @return what kind of replication scope this truncate is running under. + * This can result in a "TRUNCATE IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec() { return this.replicationSpec; } }