diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 5a187f4..ec41537 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -163,26 +163,54 @@ private synchronized void advanceDumpDir() { } static class Tuple { - final String replicatedDbName; - final String lastReplicationId; + final String dumpLocation; + final String lastReplId; - Tuple(String replicatedDbName, String lastReplicationId) { - this.replicatedDbName = replicatedDbName; - this.lastReplicationId = lastReplicationId; + Tuple(String dumpLocation, String lastReplId) { + this.dumpLocation = dumpLocation; + this.lastReplId = lastReplId; } } - private Tuple loadAndVerify(String dbName) throws IOException { + private Tuple bootstrapLoadAndVerify(String dbName, String replDbName) throws IOException { + return incrementalLoadAndVerify(dbName, null, replDbName); + } + + private Tuple incrementalLoadAndVerify(String dbName, String fromReplId, String replDbName) throws IOException { + Tuple dump = replDumpDb(dbName, fromReplId, null, null); + loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId); + return dump; + } + + private Tuple dumpDbFromLastDump(String dbName, Tuple lastDump) throws IOException { + return replDumpDb(dbName, lastDump.lastReplId, null, null); + } + + private Tuple replDumpDb(String dbName, String fromReplID, String toReplID, String limit) throws IOException { advanceDumpDir(); - run("REPL DUMP " + dbName); + String dumpCmd = "REPL DUMP " + dbName; + if (null != fromReplID) { + dumpCmd = dumpCmd + " FROM " + fromReplID; + } + if (null != toReplID) { + dumpCmd = dumpCmd + " TO " + toReplID; + } + if (null != limit) { + dumpCmd = dumpCmd + " LIMIT " + limit; + } + run(dumpCmd); String dumpLocation = getResult(0, 0); - String lastReplicationId = getResult(0, 1, true); - String replicatedDbName = dbName + "_replicated"; - run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + String lastReplId = getResult(0, 1, true); + LOG.info("Dumped to {} with id {} for command: {}", dumpLocation, lastReplId, dumpCmd); + return new Tuple(dumpLocation, lastReplId); + } + + private void loadAndVerify(String replDbName, String dumpLocation, String lastReplId) throws IOException { + run("EXPLAIN REPL LOAD " + replDbName + " FROM '" + dumpLocation + "'"); printOutput(); - run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); - verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId); - return new Tuple(replicatedDbName, lastReplicationId); + run("REPL LOAD " + replDbName + " FROM '" + dumpLocation + "'"); + verifyRun("REPL STATUS " + replDbName, lastReplId); + return; } /** @@ -222,7 +250,8 @@ public void testBasic() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); - String replicatedDbName = loadAndVerify(dbName).replicatedDbName; + String replicatedDbName = dbName + "_dupe"; + bootstrapLoadAndVerify(dbName, replicatedDbName); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1); @@ -2069,6 +2098,194 @@ public void testTruncateWithCM() throws IOException { } @Test + public void testIncrementalRepeatEventOnExistingObject() throws IOException { + String testName = "incrementalRepeatEventOnExistingObject"; + String dbName = createDB(testName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + + // Bootstrap dump/load + String replDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + + // List to maintain the incremental dumps for each operation + List incrementalDumpList = new ArrayList(); + + String[] empty = new String[] {}; + String[] unptn_data = new String[] { "ten" }; + String[] ptn_data_1 = new String[] { "fifteen" }; + String[] ptn_data_2 = new String[] { "seventeen" }; + + // INSERT EVENT to unpartitioned table + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + Tuple replDump = dumpDbFromLastDump(dbName, bootstrapDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table with dynamic ADD_PARTITION + run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // ADD_PARTITION EVENT to partitioned table + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table on existing partition + run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=2) values('" + ptn_data_2[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // TRUNCATE_PARTITION EVENT on partitioned table + run("TRUNCATE TABLE " + dbName + ".ptned PARTITION (b=1)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // TRUNCATE_TABLE EVENT on unpartitioned table + run("TRUNCATE TABLE " + dbName + ".unptned"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // CREATE_TABLE EVENT with multiple partitions + run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".ptned"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // Replicate all the events happened so far + Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2); + + // Load each incremental dump from the list. Each dump have only one operation. + for (Tuple currDump : incrementalDumpList) { + // Load the incremental dump and ensure it does nothing and lastReplID remains same + loadAndVerify(replDbName, currDump.dumpLocation, incrDump.lastReplId); + + // Verify if the data are intact even after applying an applied event once again on existing objects + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty); + verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2); + } + } + + @Test + public void testIncrementalRepeatEventOnMissingObject() throws IOException { + String testName = "incrementalRepeatEventOnMissingObject"; + String dbName = createDB(testName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + + // Bootstrap dump/load + String replDbName = dbName + "_dupe"; + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + + // List to maintain the incremental dumps for each operation + List incrementalDumpList = new ArrayList(); + + String[] empty = new String[] {}; + String[] unptn_data = new String[] { "ten" }; + String[] ptn_data_1 = new String[] { "fifteen" }; + String[] ptn_data_2 = new String[] { "seventeen" }; + + // INSERT EVENT to unpartitioned table + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + Tuple replDump = dumpDbFromLastDump(dbName, bootstrapDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table with dynamic ADD_PARTITION + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // ADD_PARTITION EVENT to partitioned table + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table on existing partition + run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // TRUNCATE_PARTITION EVENT on partitioned table + run("TRUNCATE TABLE " + dbName + ".ptned PARTITION(b=1)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // TRUNCATE_TABLE EVENT on unpartitioned table + run("TRUNCATE TABLE " + dbName + ".unptned"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // CREATE_TABLE EVENT on partitioned table + run("CREATE TABLE " + dbName + ".ptned_tmp (a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table with dynamic ADD_PARTITION + run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=10) values('" + ptn_data_1[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // INSERT EVENT to partitioned table with dynamic ADD_PARTITION + run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=20) values('" + ptn_data_2[0] + "')"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // DROP_PARTITION EVENT to partitioned table + run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b=1)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // RENAME_PARTITION EVENT to partitioned table + run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=20)"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // RENAME_TABLE EVENT to unpartitioned table + run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_new"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // DROP_TABLE EVENT to partitioned table + run("DROP TABLE " + dbName + ".ptned_tmp"); + replDump = dumpDbFromLastDump(dbName, replDump); + incrementalDumpList.add(replDump); + + // Replicate all the events happened so far + Tuple incrDump = incrementalLoadAndVerify(dbName, bootstrapDump.lastReplId, replDbName); + + verifyIfTableNotExist(replDbName, "unptned"); + verifyIfTableNotExist(replDbName, "ptned_tmp"); + verifyIfTableExist(replDbName, "unptned_new"); + verifyIfTableExist(replDbName, "ptned"); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1"))); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2"))); + verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20"))); + + // Load each incremental dump from the list. Each dump have only one operation. + for (Tuple currDump : incrementalDumpList) { + // Load the current incremental dump and ensure it does nothing and lastReplID remains same + loadAndVerify(replDbName, currDump.dumpLocation, incrDump.lastReplId); + + // Verify if the data are intact even after applying an applied event once again on missing objects + verifyIfTableNotExist(replDbName, "unptned"); + verifyIfTableNotExist(replDbName, "ptned_tmp"); + verifyIfTableExist(replDbName, "unptned_new"); + verifyIfTableExist(replDbName, "ptned"); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1"))); + verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2"))); + verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20"))); + } + } + + @Test public void testStatus() throws IOException { // first test ReplStateMap functionality Map cmap = new ReplStateMap(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 87928ee..d688a77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -934,6 +934,11 @@ private int alterDatabase(Hive db, AlterDatabaseDesc alterDbDesc) throws HiveExc case ALTER_PROPERTY: Map newParams = alterDbDesc.getDatabaseProperties(); Map params = database.getParameters(); + + if (!alterDbDesc.getReplicationSpec().allowEventReplacementInto(database)) { + return 0; // no replacement, the existing database state is newer than our update. + } + // if both old and new params are not null, merge them if (params != null && newParams != null) { params.putAll(newParams); @@ -1117,10 +1122,16 @@ private int addPartitions(Hive db, AddPartitionDesc addPartitionDesc) throws Hiv * @throws HiveException */ private int renamePartition(Hive db, RenamePartitionDesc renamePartitionDesc) throws HiveException { + String tableName = renamePartitionDesc.getTableName(); + LinkedHashMap oldPartSpec = renamePartitionDesc.getOldPartSpec(); - Table tbl = db.getTable(renamePartitionDesc.getTableName()); + if (!allowOperationInReplicationScope(db, tableName, oldPartSpec, renamePartitionDesc.getReplicationSpec())) { + // no rename, the table is missing either due to drop/rename which follows the current rename. + // or the existing table is newer than our update. + return 0; + } - LinkedHashMap oldPartSpec = renamePartitionDesc.getOldPartSpec(); + Table tbl = db.getTable(tableName); Partition oldPart = db.getPartition(tbl, oldPartSpec, false); if (oldPart == null) { String partName = FileUtils.makePartName(new ArrayList(oldPartSpec.keySet()), @@ -1131,8 +1142,7 @@ private int renamePartition(Hive db, RenamePartitionDesc renamePartitionDesc) th Partition part = db.getPartition(tbl, oldPartSpec, false); part.setValues(renamePartitionDesc.getNewPartSpec()); db.renamePartition(tbl, oldPartSpec, part); - Partition newPart = db - .getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false); + Partition newPart = db.getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false); work.getInputs().add(new ReadEntity(oldPart)); // We've already obtained a lock on the table, don't lock the partition too addIfAbsentByName(new WriteEntity(newPart, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -3559,6 +3569,12 @@ static StringBuilder appendNonNull(StringBuilder builder, Object value, boolean * Throws this exception if an unexpected error occurs. */ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { + if (!allowOperationInReplicationScope(db, alterTbl.getOldName(), null, alterTbl.getReplicationSpec())) { + // no alter, the table is missing either due to drop/rename which follows the alter. + // or the existing table is newer than our update. + return 0; + } + // alter the table Table tbl = db.getTable(alterTbl.getOldName()); @@ -4703,6 +4719,12 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H String tableName = truncateTableDesc.getTableName(); Map partSpec = truncateTableDesc.getPartSpec(); + if (!allowOperationInReplicationScope(db, tableName, partSpec, truncateTableDesc.getReplicationSpec())) { + // no truncate, the table is missing either due to drop/rename which follows the truncate. + // or the existing table is newer than our update. + return 0; + } + try { db.truncateTable(tableName, partSpec); } catch (Exception e) { @@ -4829,6 +4851,41 @@ private void makeLocationQualified(Database database) throws HiveException { } } + /** + * Validate if the given table/partition is eligible for update + * + * @param db Database. + * @param tableName Table name of format db.table + * @param partSpec Partition spec for the partition + * @param replicationSpec Replications specification + * + * @return boolean true if allow the operation + * @throws HiveException + */ + private boolean allowOperationInReplicationScope(Hive db, String tableName, Map partSpec, + ReplicationSpec replicationSpec) throws HiveException { + if ((null != replicationSpec) && (replicationSpec.isInReplicationScope())) { + // If the table/partition exist and is older than the truncate operation, then just apply the truncate else noop. + Table existingTable = db.getTable(tableName, false); + if ((existingTable == null) || (!replicationSpec.allowEventReplacementInto(existingTable))) { + // the table is missing either due to drop/rename which follows the truncate. + // or the existing table is newer than our update. So, don't allow the update. + return false; + } + + // Table exists and is older than the update. Now, need to ensure if update allowed on the partition. + if (partSpec != null) { + Partition existingPtn = db.getPartition(existingTable, partSpec, false); + if ((existingPtn == null) || (!replicationSpec.allowEventReplacementInto(existingPtn))) { + // no operation allowed, + // either the partition is missing or the existing partition state is newer than our update. + return false; + } + } + } + return true; + } + public static boolean doesTableNeedLocation(Table tbl) { // TODO: If we are ok with breaking compatibility of existing 3rd party StorageHandlers, // this method could be moved to the HiveStorageHandler interface. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index e9a4ff0..fec8bf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -697,7 +697,7 @@ private void analyzeAlterDatabaseProperties(ASTNode ast) throws SemanticExceptio throw new SemanticException("Unrecognized token in CREATE DATABASE statement"); } } - AlterDatabaseDesc alterDesc = new AlterDatabaseDesc(dbName, dbProps); + AlterDatabaseDesc alterDesc = new AlterDatabaseDesc(dbName, dbProps, null); addAlterDbDesc(alterDesc); } @@ -946,7 +946,7 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { } } - TruncateTableDesc truncateTblDesc = new TruncateTableDesc(tableName, partSpec); + TruncateTableDesc truncateTblDesc = new TruncateTableDesc(tableName, partSpec, null); DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), truncateTblDesc); Task truncateTask = TaskFactory.get(ddlWork, conf); @@ -2622,7 +2622,7 @@ private void analyzeAlterTableRename(String[] source, ASTNode ast, boolean expec String sourceName = getDotName(source); String targetName = getDotName(target); - AlterTableDesc alterTblDesc = new AlterTableDesc(sourceName, targetName, expectView); + AlterTableDesc alterTblDesc = new AlterTableDesc(sourceName, targetName, expectView, null); addInputsOutputsAlterTable(sourceName, null, alterTblDesc); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf)); @@ -2741,7 +2741,7 @@ private void analyzeAlterTableRenamePart(ASTNode ast, String tblName, partSpecs.add(oldPartSpec); partSpecs.add(newPartSpec); addTablePartsOutputs(tab, partSpecs, WriteEntity.WriteType.DDL_EXCLUSIVE); - RenamePartitionDesc renamePartitionDesc = new RenamePartitionDesc(tblName, oldPartSpec, newPartSpec); + RenamePartitionDesc renamePartitionDesc = new RenamePartitionDesc(tblName, oldPartSpec, newPartSpec, null); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), renamePartitionDesc), conf)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index dc86942..4674058 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -792,21 +792,6 @@ private static void createReplImportTasks( Task dr = null; WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; - if ((table != null) && (isPartitioned(tblDesc) != table.isPartitioned())){ - // If destination table exists, but is partitioned, and we think we're writing to an unpartitioned - // or if destination table exists, but is unpartitioned and we think we're writing to a partitioned - // table, then this can only happen because there are drops in the queue that are yet to be processed. - // So, we check the repl.last.id of the destination, and if it's newer, we no-op. If it's older, we - // drop and re-create. - if (replicationSpec.allowReplacementInto(table)){ - dr = dropTableTask(table, x); - lockType = WriteEntity.WriteType.DDL_EXCLUSIVE; - table = null; // null it out so we go into the table re-create flow. - } else { - return; // noop out of here. - } - } - // Normally, on import, trying to create a table or a partition in a db that does not yet exist // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying // to create tasks to create a table inside a db that as-of-now does not exist, but there is @@ -818,6 +803,20 @@ private static void createReplImportTasks( throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName())); } } + + if (table != null) { + if (!replicationSpec.allowReplacementInto(table)) { + // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + return; + } + } else { + // If table doesn't exist, allow creating a new one only if the database state is older than the update. + if ((parentDb != null) && (!replicationSpec.allowReplacementInto(parentDb))) { + // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + return; + } + } + if (tblDesc.getLocation() == null) { if (!waitOnPrecursor){ tblDesc.setLocation(wh.getDefaultTablePath(parentDb, tblDesc.getTableName()).toString()); @@ -832,16 +831,15 @@ private static void createReplImportTasks( } } - /* Note: In the following section, Metadata-only import handling logic is - interleaved with regular repl-import logic. The rule of thumb being - followed here is that MD-only imports are essentially ALTERs. They do - not load data, and should not be "creating" any metadata - they should - be replacing instead. The only place it makes sense for a MD-only import - to create is in the case of a table that's been dropped and recreated, - or in the case of an unpartitioned table. In all other cases, it should - behave like a noop or a pure MD alter. - */ - + /* Note: In the following section, Metadata-only import handling logic is + interleaved with regular repl-import logic. The rule of thumb being + followed here is that MD-only imports are essentially ALTERs. They do + not load data, and should not be "creating" any metadata - they should + be replacing instead. The only place it makes sense for a MD-only import + to create is in the case of a table that's been dropped and recreated, + or in the case of an unpartitioned table. In all other cases, it should + behave like a noop or a pure MD alter. + */ if (table == null) { // Either we're dropping and re-creating, or the table didn't exist, and we're creating. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 961561d..5787d70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -668,11 +668,11 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { for (String tableName : tablesUpdated.keySet()){ // weird - AlterTableDesc requires a HashMap to update props instead of a Map. HashMap mapProp = new HashMap(); - mapProp.put( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), - tablesUpdated.get(tableName).toString()); + String eventId = tablesUpdated.get(tableName).toString(); + + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); AlterTableDesc alterTblDesc = new AlterTableDesc( - AlterTableDesc.AlterTableTypes.ADDPROPS, null, false); + AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(eventId, eventId)); alterTblDesc.setProps(mapProp); alterTblDesc.setOldName(tableName); Task updateReplIdTask = TaskFactory.get( @@ -682,10 +682,10 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } for (String dbName : dbsUpdated.keySet()){ Map mapProp = new HashMap(); - mapProp.put( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), - dbsUpdated.get(dbName).toString()); - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp); + String eventId = dbsUpdated.get(dbName).toString(); + + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(eventId, eventId)); Task updateReplIdTask = TaskFactory.get( new DDLWork(inputs, outputs, alterDbDesc), conf); taskChainTail.addDependentTask(updateReplIdTask); @@ -786,7 +786,7 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) Task dbRootTask = null; if (existEmptyDb(dbName)) { - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, dbObj.getParameters()); + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, dbObj.getParameters(), null); dbRootTask = TaskFactory.get(new DDLWork(inputs, outputs, alterDbDesc), conf); } else { CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 1ea608b..fdb27ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -19,6 +19,7 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -134,6 +135,10 @@ public ReplicationSpec(){ this((ASTNode)null); } + public ReplicationSpec(String fromId, String toId) { + this(true, false, fromId, toId, false, true, false); + } + public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy, boolean isReplace) { @@ -189,18 +194,18 @@ public static boolean allowReplacement(String currReplState, String replacementR } // First try to extract a long value from the strings, and compare them. - // If oldReplState is less-than or equal to newReplState, allow. + // If oldReplState is less-than newReplState, allow. long currReplStateLong = Long.parseLong(currReplState.replaceAll("\\D","")); long replacementReplStateLong = Long.parseLong(replacementReplState.replaceAll("\\D","")); if ((currReplStateLong != 0) || (replacementReplStateLong != 0)){ - return ((currReplStateLong - replacementReplStateLong) <= 0); + return ((currReplStateLong - replacementReplStateLong) < 0); } // If the long value of both is 0, though, fall back to lexical comparison. // Lexical comparison according to locale will suffice for now, future might add more logic - return (collator.compare(currReplState.toLowerCase(), replacementReplState.toLowerCase()) <= 0); + return (collator.compare(currReplState.toLowerCase(), replacementReplState.toLowerCase()) < 0); } /** @@ -236,6 +241,14 @@ public boolean allowReplacementInto(Table table) { } /** + * Determines if a current replication object(current state of dump) is allowed to + * replicate-replace-into a given database + */ + public boolean allowReplacementInto(Database database) { + return allowReplacement(getLastReplicatedStateFromParameters(database.getParameters()),this.getCurrentReplicationState()); + } + + /** * Determines if a current replication event specification is allowed to * replicate-replace-into a given table */ @@ -244,6 +257,15 @@ public boolean allowEventReplacementInto(Table table) { } /** + * Determines if a current replication event specification is allowed to + * replicate-replace-into a given database. + * This is invoked only for alter database properties to set the repl.last.id + */ + public boolean allowEventReplacementInto(Database database) { + return allowReplacement(getLastReplicatedStateFromParameters(database.getParameters()),this.getReplicationState()); + } + + /** * Returns a predicate filter to filter an Iterable to return all partitions * that the current replication event specification is allowed to replicate-replace-into */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index 0580546..de47b22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -34,9 +34,7 @@ EVENT_CREATE_TABLE("EVENT_CREATE_TABLE") { @Override - public MessageHandler handler() { - return new TableHandler(); - } + public MessageHandler handler() { return new TableHandler(); } }, EVENT_ADD_PARTITION("EVENT_ADD_PARTITION") { @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java index 95e51e4..0e9d8c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java @@ -58,10 +58,6 @@ ReplicationSpec eventOnlyReplicationSpec(Context forContext) throws SemanticException { String eventId = forContext.dmd.getEventTo().toString(); - return replicationSpec(eventId, eventId); - } - - private ReplicationSpec replicationSpec(String fromId, String toId) throws SemanticException { - return new ReplicationSpec(true, false, fromId, toId, false, true, false); + return new ReplicationSpec(eventId, eventId); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 627fb46..75b564d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -59,14 +59,11 @@ : new SemanticException("Error reading message members", e); } - RenamePartitionDesc renamePtnDesc = - new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec); + RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec, + eventOnlyReplicationSpec(context)); Task renamePtnTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf - ); - context.log - .debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, - newPartSpec); + new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf); + context.log.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); tablesUpdated.put(tableName, context.dmd.getEventTo()); return Collections.singletonList(renamePtnTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java index 10f0753..552e01b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -57,17 +57,17 @@ String oldName = oldDbName + "." + msg.getTableObjBefore().getTableName(); String newName = newDbName + "." + msg.getTableObjAfter().getTableName(); - AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false); + AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false, + eventOnlyReplicationSpec(context)); Task renameTableTask = TaskFactory.get( - new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf - ); - context.log.debug( - "Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName - ); + new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf); + context.log.debug("Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName); + // oldDbName and newDbName *will* be the same if we're here databasesUpdated.put(newDbName, context.dmd.getEventTo()); tablesUpdated.remove(oldName); tablesUpdated.put(newName, context.dmd.getEventTo()); + // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated // However, we explicitly don't support repl of that sort, and error out above if so. If that should // ever change, this will need reworking. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 09d70eb..eeb5cd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -37,27 +37,18 @@ throw new SemanticException("Database name cannot be null for a table load"); } try { - // TODO: why not have the below variables as static / inline seems to have no possibility of updates back here - - // no location set on repl loads - boolean isLocationSet = false; - // all repl imports are non-external - boolean isExternalSet = false; - // bootstrap loads are not partition level - boolean isPartSpecSet = false; - // repl loads are not partition level - LinkedHashMap parsedPartSpec = null; - // no location for repl imports - String parsedLocation = null; List> importTasks = new ArrayList<>(); EximUtil.SemanticAnalyzerWrapperContext x = new EximUtil.SemanticAnalyzerWrapperContext( context.hiveConf, context.db, readEntitySet, writeEntitySet, importTasks, context.log, context.nestedContext); - ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, - (context.precursor != null), parsedLocation, context.tableName, context.dbName, - parsedPartSpec, context.location, x, + + // REPL LOAD is not partition level. It is always DB or table level. So, passing null for partition specs. + // Also, REPL LOAD doesn't support external table and hence no location set as well. + ImportSemanticAnalyzer.prepareImport(false, false, false, + (context.precursor != null), null, context.tableName, context.dbName, + null, context.location, x, databasesUpdated, tablesUpdated); return importTasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index fe45788..b8d6a99 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -54,16 +54,15 @@ } } - TruncateTableDesc truncateTableDesc = new TruncateTableDesc( - actualDbName + "." + actualTblName, partSpec); - Task truncatePtnTask = - TaskFactory.get( + TruncateTableDesc truncateTableDesc = new TruncateTableDesc(actualDbName + "." + actualTblName, partSpec, + eventOnlyReplicationSpec(context)); + Task truncatePtnTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), - context.hiveConf - ); + context.hiveConf); context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName()); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); return Collections.singletonList(truncatePtnTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index fc024f1..e6f8b71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -35,16 +35,16 @@ String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; - TruncateTableDesc truncateTableDesc = new TruncateTableDesc( - actualDbName + "." + actualTblName, null); + TruncateTableDesc truncateTableDesc = new TruncateTableDesc(actualDbName + "." + actualTblName, + null, eventOnlyReplicationSpec(context)); Task truncateTableTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), - context.hiveConf - ); + context.hiveConf); context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName()); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); return Collections.singletonList(truncateTableTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java index 5e218c4..77c7b61 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterDatabaseDesc.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Map; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain.Level; /** @@ -41,6 +42,7 @@ String databaseName; Map dbProperties; PrincipalDesc ownerPrincipal; + ReplicationSpec replicationSpec; /** * For serialization only. @@ -48,10 +50,11 @@ public AlterDatabaseDesc() { } - public AlterDatabaseDesc(String databaseName, Map dbProps) { + public AlterDatabaseDesc(String databaseName, Map dbProps, ReplicationSpec replicationSpec) { super(); this.databaseName = databaseName; this.dbProperties = dbProps; + this.replicationSpec = replicationSpec; this.setAlterType(ALTER_DB_TYPES.ALTER_PROPERTY); } @@ -95,4 +98,16 @@ public ALTER_DB_TYPES getAlterType() { public void setAlterType(ALTER_DB_TYPES alterType) { this.alterType = alterType; } + + /** + * @return what kind of replication scope this alter is running under. + * This can result in a "ALTER IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec() { + if (replicationSpec == null) { + this.replicationSpec = new ReplicationSpec(); + } + + return this.replicationSpec; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java index 6cfde18..2691faa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -131,6 +132,7 @@ List foreignKeyCols; List uniqueConstraintCols; List notNullConstraintCols; + ReplicationSpec replicationSpec; public AlterTableDesc() { } @@ -188,12 +190,17 @@ public AlterTableDesc(String tblName, HashMap partSpec, * old name of the table * @param newName * new name of the table + * @param expectView + * Flag to denote if current table can be a view + * @param replicationSpec + * Replication specification with current event ID */ - public AlterTableDesc(String oldName, String newName, boolean expectView) { + public AlterTableDesc(String oldName, String newName, boolean expectView, ReplicationSpec replicationSpec) { op = AlterTableTypes.RENAME; this.oldName = oldName; this.newName = newName; this.expectView = expectView; + this.replicationSpec = replicationSpec; } /** @@ -214,6 +221,17 @@ public AlterTableDesc(String name, HashMap partSpec, List partSpec, boolean expectView) { op = alterType; @@ -858,4 +880,9 @@ public void setEnvironmentContext(EnvironmentContext environmentContext) { this.environmentContext = environmentContext; } + /** + * @return what kind of replication scope this alter is running under. + * This can result in a "ALTER IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec(){ return this.replicationSpec; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java index 7523d01..ef85d13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; + import java.io.Serializable; import java.util.LinkedHashMap; import java.util.Map; @@ -28,10 +30,11 @@ private static final long serialVersionUID = 1L; - String tableName; - String location; - LinkedHashMap oldPartSpec; - LinkedHashMap newPartSpec; + private String tableName; + private String location; + private LinkedHashMap oldPartSpec; + private LinkedHashMap newPartSpec; + private ReplicationSpec replicationSpec; /** * For serialization only. @@ -40,8 +43,6 @@ public RenamePartitionDesc() { } /** - * @param dbName - * database to add to. * @param tableName * table to add to. * @param oldPartSpec @@ -50,10 +51,11 @@ public RenamePartitionDesc() { * new partition specification. */ public RenamePartitionDesc(String tableName, - Map oldPartSpec, Map newPartSpec) { + Map oldPartSpec, Map newPartSpec, ReplicationSpec replicationSpec) { this.tableName = tableName; this.oldPartSpec = new LinkedHashMap(oldPartSpec); this.newPartSpec = new LinkedHashMap(newPartSpec); + this.replicationSpec = replicationSpec; } /** @@ -115,4 +117,10 @@ public void setOldPartSpec(LinkedHashMap partSpec) { public void setNewPartSpec(LinkedHashMap partSpec) { this.newPartSpec = partSpec; } + + /** + * @return what kind of replication scope this rename is running under. + * This can result in a "RENAME IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec() { return this.replicationSpec; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java index 90c123d..f07fe04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TruncateTableDesc.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -39,13 +40,15 @@ private Path inputDir; private Path outputDir; private ListBucketingCtx lbCtx; + private ReplicationSpec replicationSpec; public TruncateTableDesc() { } - public TruncateTableDesc(String tableName, Map partSpec) { + public TruncateTableDesc(String tableName, Map partSpec, ReplicationSpec replicationSpec) { this.tableName = tableName; this.partSpec = partSpec; + this.replicationSpec = replicationSpec; } @Explain(displayName = "TableName", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -98,4 +101,10 @@ public ListBucketingCtx getLbCtx() { public void setLbCtx(ListBucketingCtx lbCtx) { this.lbCtx = lbCtx; } + + /** + * @return what kind of replication scope this truncate is running under. + * This can result in a "TRUNCATE IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec() { return this.replicationSpec; } }