diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 1f41d46782..0bc7bb3cb2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -395,7 +395,7 @@ private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tu HiveConf confTemp = new HiveConf(); confTemp.set("hive.repl.enable.move.optimization", "true"); ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, - null, isIncrementalDump, Long.valueOf(tuple.lastReplId), + null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), Collections.emptyList()); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 7c1d01024c..e51814321c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -37,6 +38,7 @@ import java.util.Map; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; /** * Tests Table level replication scenarios. @@ -115,6 +117,14 @@ private String replicateAndVerify(String replPolicy, String lastReplId, List dumpWithClause, List loadWithClause, String[] expectedTables) throws Throwable { + return replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, loadWithClause, null, expectedTables); + } + + private String replicateAndVerify(String replPolicy, String oldReplPolicy, String lastReplId, + List dumpWithClause, + List loadWithClause, + String[] bootstrappedTables, + String[] expectedTables) throws Throwable { if (dumpWithClause == null) { dumpWithClause = new ArrayList<>(); } @@ -126,8 +136,11 @@ private String replicateAndVerify(String replPolicy, String lastReplId, if (lastReplId == null) { replica.run("drop database if exists " + replicatedDbName + " cascade"); } - WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) - .dump(replPolicy, lastReplId, dumpWithClause); + WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy, lastReplId, dumpWithClause); + + if (oldReplPolicy != null) { + verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); + } replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) @@ -136,6 +149,31 @@ private String replicateAndVerify(String replPolicy, String lastReplId, return tuple.lastReplicationId; } + private void verifyBootstrapDirInIncrementalDump(String dumpLocation, String[] bootstrappedTables) + throws Throwable { + // _bootstrap directory should be created as bootstrap enabled on external tables. + Path dumpPath = new Path(dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME); + + // If nothing to be bootstrapped. + if (bootstrappedTables.length == 0) { + Assert.assertFalse(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); + return; + } + + Assert.assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); + + // Check if the DB dump path have any tables other than the ones listed in bootstrappedTables. + Path dbPath = new Path(dumpPath, primaryDbName); + FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath); + Assert.assertEquals(fileStatuses.length, bootstrappedTables.length); + + // Eg: _bootstrap//t2, _bootstrap//t3 etc + for (String tableName : bootstrappedTables) { + Path tblPath = new Path(dbPath, tableName); + Assert.assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); + } + } + @Test public void testBasicBootstrapWithIncludeList() throws Throwable { String[] originalNonAcidTables = new String[] {"t1", "t2" }; @@ -197,7 +235,7 @@ public void testBasicIncrementalWithIncludeAndExcludeList() throws Throwable { } @Test - public void testReplDumpWithIncorrectTablePolicy() throws Throwable { + public void testIncorrectTablePolicyInReplDump() throws Throwable { String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t111" }; createTables(originalTables, CreateTableType.NON_ACID); @@ -224,8 +262,39 @@ public void testReplDumpWithIncorrectTablePolicy() throws Throwable { Assert.assertTrue(failed); } + // Test incremental replication with invalid replication policies in REPLACE clause. + String replPolicy = primaryDbName; + WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) + .dump(primaryDbName, null); + replica.load(replicatedDbName, tupleBootstrap.dumpLocation); + String lastReplId = tupleBootstrap.lastReplicationId; + for (String oldReplPolicy : invalidReplPolicies) { + failed = false; + try { + replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null, null, replicatedTables); + } catch (Exception ex) { + LOG.info("Got exception: {}", ex.getMessage()); + Assert.assertTrue(ex instanceof ParseException); + failed = true; + } + Assert.assertTrue(failed); + } + + // Replace with replication policy having different DB name. + String oldReplPolicy = replPolicy; + replPolicy = primaryDbName + "_dupe.['t1+'].['t1']"; + failed = false; + try { + replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null, null, replicatedTables); + } catch (Exception ex) { + LOG.info("Got exception: {}", ex.getMessage()); + Assert.assertTrue(ex instanceof SemanticException); + failed = true; + } + Assert.assertTrue(failed); + // Invalid pattern where we didn't enclose table pattern within single or double quotes. - String replPolicy = primaryDbName + ".[t1].[t2]"; + replPolicy = primaryDbName + ".[t1].[t2]"; failed = false; try { replicateAndVerify(replPolicy, null, null, null, replicatedTables); @@ -263,7 +332,14 @@ public void testCaseInSensitiveNatureOfReplPolicy() throws Throwable { // Replicate and verify if 2 tables are replicated as per policy. String replPolicy = primaryDbName.toUpperCase() + ".['.*a1+', 'cc3', 'B2'].['AA1+', 'b2']"; String[] replicatedTables = new String[] {"a1", "cc3" }; - replicateAndVerify(replPolicy, null, null, null, replicatedTables); + String lastReplId = replicateAndVerify(replPolicy, null, null, null, replicatedTables); + + // Test case insensitive nature in REPLACE clause as well. + String oldReplPolicy = replPolicy; + replPolicy = primaryDbName + ".['.*a1+', 'cc3', 'B2'].['AA1+']"; + replicatedTables = new String[] {"a1", "b2", "cc3" }; + String[] bootstrappedTables = new String[] {"b2" }; + replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null, bootstrappedTables, replicatedTables); } @Test @@ -335,7 +411,8 @@ public void testBootstrapExternalTablesIncrementalPhaseWithIncludeAndExcludeList ); String replPolicy = primaryDbName + ".['a[0-9]+', 'b2'].['a1']"; String[] bootstrapReplicatedTables = new String[] {"b2" }; - String lastReplId = replicateAndVerify(replPolicy, null, dumpWithClause, loadWithClause, bootstrapReplicatedTables); + String lastReplId = replicateAndVerify(replPolicy, null, + dumpWithClause, loadWithClause, bootstrapReplicatedTables); // Enable external tables replication and bootstrap in incremental phase. String[] incrementalReplicatedTables = new String[] {"a2", "b2" }; @@ -357,4 +434,120 @@ public void testBootstrapExternalTablesIncrementalPhaseWithIncludeAndExcludeList .run("show tables") .verifyResults(incrementalReplicatedTables); } + + @Test + public void testBasicReplaceReplPolicy() throws Throwable { + String[] originalNonAcidTables = new String[] {"t1", "t2" }; + String[] originalFullAcidTables = new String[] {"t3", "t4" }; + String[] originalMMAcidTables = new String[] {"t5" }; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only 2 tables are replicated to target. + String replPolicy = primaryDbName + ".['t1', 't4']"; + String oldReplPolicy = null; + String[] replicatedTables = new String[] {"t1", "t4" }; + String lastReplId = replicateAndVerify(replPolicy, null, null, null, replicatedTables); + + // Exclude t4 and include t3, t6 + createTables(new String[] {"t6" }, CreateTableType.MM_ACID); + oldReplPolicy = replPolicy; + replPolicy = primaryDbName + ".['t1', 't3', 't6']"; + replicatedTables = new String[] {"t1", "t3", "t6" }; + String[] bootstrappedTables = new String[] {"t3", "t6" }; + lastReplId = replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, + null, null, bootstrappedTables, replicatedTables); + + // Convert to Full Db repl policy. All tables should be included. + oldReplPolicy = replPolicy; + replPolicy = primaryDbName; + replicatedTables = new String[] {"t1", "t2", "t3", "t4", "t5", "t6" }; + bootstrappedTables = new String[] {"t2", "t4", "t5" }; + replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, + null, null, bootstrappedTables, replicatedTables); + + // Convert to regex that excludes t3, t4 and t5. + oldReplPolicy = replPolicy; + replPolicy = primaryDbName + ".['.*?'].['t[3-5]+']"; + replicatedTables = new String[] {"t1", "t2", "t6" }; + bootstrappedTables = new String[]{}; + replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, + null, null, bootstrappedTables, replicatedTables); + } + + @Test + public void testReplacePolicyOnBootstrapAcidTablesIncrementalPhase() throws Throwable { + String[] originalNonAcidTables = new String[] {"a1", "b2" }; + String[] originalFullAcidTables = new String[] {"a2", "b1" }; + String[] originalMMAcidTables = new String[] {"a3", "a4" }; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only non-acid tables are replicated to target. + List dumpWithoutAcidClause = Collections.singletonList( + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'"); + String replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a4']"; + String[] bootstrapReplicatedTables = new String[] {"a1", "b2" }; + String lastReplId = replicateAndVerify(replPolicy, null, + dumpWithoutAcidClause, null, bootstrapReplicatedTables); + + // Enable acid tables for replication. Also, replace, replication policy to exclude "a3" + // instead of "a4" and also "b2". + String oldReplPolicy = replPolicy; + replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a3', 'b2']"; + List dumpWithAcidBootstrapClause = Arrays.asList( + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'"); + String[] incrementalReplicatedTables = new String[] {"a1", "a2", "a4", "b1" }; + String[] bootstrappedTables = new String[] {"a2", "a4", "b1" }; + replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, + dumpWithAcidBootstrapClause, null, bootstrappedTables, incrementalReplicatedTables); + } + + @Test + public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws Throwable { + String[] originalAcidTables = new String[] {"a1", "b2" }; + String[] originalExternalTables = new String[] {"a2", "b1", "c3" }; + createTables(originalAcidTables, CreateTableType.FULL_ACID); + createTables(originalExternalTables, CreateTableType.EXTERNAL); + + // Bootstrap should exclude external tables. + List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" + ); + String replPolicy = primaryDbName + ".['a[0-9]+', 'b2'].['a1']"; + String[] bootstrapReplicatedTables = new String[] {"b2" }; + String lastReplId = replicateAndVerify(replPolicy, null, + dumpWithClause, loadWithClause, bootstrapReplicatedTables); + + // Enable external tables replication and bootstrap in incremental phase. Also, replace, + // replication policy to exclude tables with prefix "b". + String oldReplPolicy = replPolicy; + replPolicy = primaryDbName + ".['[a-z]+[0-9]+'].['b[0-9]+']"; + String[] incrementalReplicatedTables = new String[] {"a1", "a2", "c3" }; + String[] bootstrappedTables = new String[] {"a1", "a2", "c3" }; + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .dump(replPolicy, oldReplPolicy, lastReplId, dumpWithClause); + + // the _external_tables_file info should be created as external tables are to be replicated. + Assert.assertTrue(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + // Verify that the external table info contains table "a2" and "c3". + ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2", "c3"), + new Path(tuple.dumpLocation, FILE_NAME)); + + // Verify if the expected tables are bootstrapped. + verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(incrementalReplicatedTables); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index cdf90711a3..6326bc34f2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -265,6 +265,18 @@ Tuple dump(String dbName, String lastReplicationId, List withClauseOptio return dump(dumpCommand); } + Tuple dump(String replPolicy, String oldReplPolicy, String lastReplicationId, List withClauseOptions) + throws Throwable { + String dumpCommand = + "REPL DUMP " + replPolicy + + (oldReplPolicy == null ? "" : " REPLACE " + oldReplPolicy) + + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); + if (!withClauseOptions.isEmpty()) { + dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + return dump(dumpCommand); + } + Tuple dump(String dumpCommand) throws Throwable { advanceDumpDir(); run(dumpCommand); @@ -303,8 +315,6 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation, List values) throws SemanticException { for (String s : values) { LOG.debug(" > " + s); } - Utils.writeOutput(values, new Path(work.resultTempPath), conf); + Utils.writeOutput(Collections.singletonList(values), new Path(work.resultTempPath), conf); } /** @@ -152,22 +152,27 @@ private boolean shouldDumpExternalTableLocation() { } private boolean shouldExamineTablesToDump() { - return shouldDumpExternalTableLocation() || - conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES); + return (work.oldReplScope != null) + || shouldDumpExternalTableLocation() + || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES); } private boolean shouldBootstrapDumpTable(Table table) { - if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES) && - TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES) + && TableType.EXTERNAL_TABLE.equals(table.getTableType())) { return true; } - if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) && - AcidUtils.isTransactionalTable(table)) { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) + && AcidUtils.isTransactionalTable(table)) { return true; } - return false; + // If replication policy is replaced with new included/excluded tables list, then tables which + // are not included in old policy but included in new policy should be bootstrapped along with + // the current incremental replication dump. + // Note: If control reaches here, it means, table is included in new replication policy. + return !ReplUtils.tableIncludedInReplScope(work.oldReplScope, table.getTableName()); } private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { @@ -181,7 +186,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // bootstrap (See bootstrapDump() for more details. Only difference here is instead of // waiting for the concurrent transactions to finish, we start dumping the incremental events // and wait only for the remaining time if any. - if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) { + if (needBootstrapAcidTablesDuringIncrementalDump()) { bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0); LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}", @@ -233,24 +238,23 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive replLogger.endLog(lastReplId.toString()); LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); - Utils.writeOutput( - Arrays.asList( - "incremental", - String.valueOf(work.eventFrom), - String.valueOf(lastReplId) - ), - dmd.getDumpFilePath(), conf); dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot); - dmd.write(); - // If required wait more for any transactions open at the time of starting the ACID bootstrap. - if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) { - assert (waitUntilTime > 0); - validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); + // If repl policy is changed (oldReplScope is set), then pass the current replication policy, + // so that REPL LOAD would drop the tables which are not included in current policy. + if (work.oldReplScope != null) { + dmd.setReplScope(work.replScope); } + dmd.write(); // Examine all the tables if required. if (shouldExamineTablesToDump()) { + // If required wait more for any transactions open at the time of starting the ACID bootstrap. + if (needBootstrapAcidTablesDuringIncrementalDump()) { + assert (waitUntilTime > 0); + validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); + } + Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); try (Writer writer = new Writer(dumpRoot, conf)) { @@ -259,8 +263,10 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive Table table = hiveDb.getTable(dbName, tableName); // Dump external table locations if required. - if (shouldDumpExternalTableLocation() && - TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + // Note: If repl policy is replaced, then need to dump external tables if table is getting replicated + // for the first time in current dump. So, need to check if table is included in old policy. + if ((shouldDumpExternalTableLocation() || !ReplUtils.tableIncludedInReplScope(work.oldReplScope, tableName)) + && TableType.EXTERNAL_TABLE.equals(table.getTableType())) { writer.dataLocationDump(table); } @@ -282,6 +288,12 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive return lastReplId; } + private boolean needBootstrapAcidTablesDuringIncrementalDump() { + // If old replication policy is available, then it is possible some of the ACID tables might be + // included for bootstrap during incremental dump. + return (work.oldReplScope != null) || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES); + } + private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncrementalPhase) { if (isIncrementalPhase) { dumpRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); @@ -296,7 +308,8 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) db, conf, getNewEventOnlyReplicationSpec(ev.getEventId()), - work.replScope + work.replScope, + work.oldReplScope ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 247066c451..7bae9ac66d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -30,6 +30,7 @@ Explain.Level.EXTENDED }) public class ReplDumpWork implements Serializable { final ReplScope replScope; + final ReplScope oldReplScope; final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; final Long eventFrom; Long eventTo; @@ -40,10 +41,11 @@ public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; } - public ReplDumpWork(ReplScope replScope, + public ReplDumpWork(ReplScope replScope, ReplScope oldReplScope, Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit, String resultTempPath) { this.replScope = replScope; + this.oldReplScope = oldReplScope; this.dbNameOrPattern = replScope.getDbName(); this.eventFrom = eventFrom; this.eventTo = eventTo; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index f4f98a69d5..45b5b5a1cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import com.google.common.collect.Collections2; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.InvalidInputException; @@ -53,6 +55,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; @@ -364,6 +367,35 @@ private void cleanTablesFromBootstrap() throws HiveException, IOException, Inval } } + /** + * If replication policy is changed between previous and current load, then the excluded tables in + * the new replication policy will be dropped. + * @throws HiveException Failed to get/drop the tables. + */ + private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveException { + // If all tables are included in replication scope, then nothing to be dropped. + if ((replScope == null) || replScope.includeAllTables()) { + return; + } + + Hive db = getHive(); + String dbName = replScope.getDbName(); + + // List all the tables that are excluded in the current repl scope. + Iterable tableNames = Collections2.filter(db.getAllTables(dbName), + tableName -> { + assert(tableName != null); + return !tableName.toLowerCase().startsWith( + SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()) + && !replScope.tableIncludedInReplScope(tableName); + }); + for (String table : tableNames) { + db.dropTable(dbName + "." + table, true); + } + LOG.info("Tables in the Database: {} that are excluded in the replication scope are dropped.", + dbName); + } + private void createEndReplLogTask(Context context, Scope scope, ReplLogger replLogger) throws SemanticException { Map dbProps; @@ -457,6 +489,10 @@ private int executeIncrementalLoad(DriverContext driverContext) { work.needCleanTablesFromBootstrap = false; } + // If replication policy is changed between previous and current repl load, then drop the tables + // that are excluded in the new replication policy. + dropTablesExcludedInReplScope(work.changedReplScope); + IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); // If incremental events are already applied, then check and perform if need to bootstrap any tables. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index f1f764e94b..c25afb4b41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; @@ -42,6 +43,7 @@ Explain.Level.EXTENDED }) public class ReplLoadWork implements Serializable { final String dbNameToLoadIn; + final ReplScope changedReplScope; final String dumpDirectory; final String bootstrapDumpToCleanTables; boolean needCleanTablesFromBootstrap; @@ -61,12 +63,19 @@ */ final LineageState sessionStateLineageState; - public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - LineageState lineageState, boolean isIncrementalDump, Long eventTo, - List pathsToCopyIterator) throws IOException { + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, + String dbNameToLoadIn, ReplScope changedReplScope, + LineageState lineageState, boolean isIncrementalDump, Long eventTo, + List pathsToCopyIterator) throws IOException { sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; + this.changedReplScope = changedReplScope; + + // If DB name is changed during REPL LOAD, then set it instead of referring to source DB name. + if ((changedReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) { + changedReplScope.setDbName(dbNameToLoadIn); + } this.bootstrapDumpToCleanTables = hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG); this.needCleanTablesFromBootstrap = StringUtils.isNotBlank(this.bootstrapDumpToCleanTables); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 2fcbe64aa7..0e52ece96d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.repl.ReplConst; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; @@ -277,4 +278,8 @@ public static boolean includeAcidTableInDump(HiveConf conf) { return true; } + + public static boolean tableIncludedInReplScope(ReplScope replScope, String tableName) { + return ((replScope == null) || replScope.tableIncludedInReplScope(tableName)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index cfdf180a9a..388fa63002 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -894,12 +894,20 @@ replDumpStatement @after { popMsg(state); } : KW_REPL KW_DUMP (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? + (KW_REPLACE replacePolicy=replReplacePolicy)? (KW_FROM (eventId=Number) (KW_TO (rangeEnd=Number))? (KW_LIMIT (batchSize=Number))? )? (KW_WITH replConf=replConfigs)? - -> ^(TOK_REPL_DUMP $dbName $tablePolicy? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) + -> ^(TOK_REPL_DUMP $dbName $tablePolicy? $replacePolicy? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) + ; + +replReplacePolicy +@init { pushMsg("repl dump replaces replication policy", state); } +@after { popMsg(state); } + : + (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? -> ^(TOK_REPLACE $dbName $tablePolicy?) ; replLoadStatement diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 4f4a02f797..df41a2e978 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -59,6 +59,7 @@ import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_NULL; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD; @@ -70,6 +71,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Replication Scope private ReplScope replScope = new ReplScope(); + private ReplScope oldReplScope = null; private Long eventFrom; private Long eventTo; @@ -130,12 +132,13 @@ private void setTxnConfigs() { } } - private void setReplDumpTablesList(Tree replTablesNode) throws HiveException { + private void setReplDumpTablesList(Tree replTablesNode, ReplScope replScope) throws HiveException { int childCount = replTablesNode.getChildCount(); assert(childCount <= 2); // Traverse the children which can be either just include tables list or both include // and exclude tables lists. + String replScopeType = (replScope == this.replScope) ? "Current" : "Old"; for (int listIdx = 0; listIdx < childCount; listIdx++) { Tree tablesListNode = replTablesNode.getChild(listIdx); assert(tablesListNode.getType() == TOK_REPL_TABLES_LIST); @@ -151,21 +154,49 @@ private void setReplDumpTablesList(Tree replTablesNode) throws HiveException { } if (listIdx == 0) { - LOG.info("ReplScope: Set Included Tables List: {}", tablesList); + LOG.info("{} ReplScope: Set Included Tables List: {}", replScopeType, tablesList); replScope.setIncludedTablePatterns(tablesList); } else { - LOG.info("ReplScope: Set Excluded Tables List: {}", tablesList); + LOG.info("{} ReplScope: Set Excluded Tables List: {}", replScopeType, tablesList); replScope.setExcludedTablePatterns(tablesList); } } } + private void setOldReplPolicy(Tree oldReplPolicyTree) throws HiveException { + oldReplScope = new ReplScope(); + int childCount = oldReplPolicyTree.getChildCount(); + + // First child is DB name and optional second child is tables list. + assert(childCount <= 2); + + // First child is always the DB name. So set it. + oldReplScope.setDbName(oldReplPolicyTree.getChild(0).getText()); + LOG.info("Old ReplScope: Set DB Name: {}", oldReplScope.getDbName()); + if (!oldReplScope.getDbName().equalsIgnoreCase(replScope.getDbName())) { + LOG.error("DB name {} cannot be replaced to {} in the replication policy.", + oldReplScope.getDbName(), replScope.getDbName()); + throw new SemanticException("DB name cannot be replaced in the replication policy."); + } + + // If the old policy is just , then tables list won't be there. + if (childCount <= 1) { + return; + } + + // Traverse the children which can be either just include tables list or both include + // and exclude tables lists. + Tree oldPolicyTablesListNode = oldReplPolicyTree.getChild(1); + assert(oldPolicyTablesListNode.getType() == TOK_REPL_TABLES); + setReplDumpTablesList(oldPolicyTablesListNode, oldReplScope); + } + private void initReplDump(ASTNode ast) throws HiveException { int numChildren = ast.getChildCount(); boolean isMetaDataOnly = false; String dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); - LOG.info("ReplScope: Set DB Name: {}", dbNameOrPattern); + LOG.info("Current ReplScope: Set DB Name: {}", dbNameOrPattern); replScope.setDbName(dbNameOrPattern); // Skip the first node, which is always required @@ -185,7 +216,11 @@ private void initReplDump(ASTNode ast) throws HiveException { break; } case TOK_REPL_TABLES: { - setReplDumpTablesList(currNode); + setReplDumpTablesList(currNode, replScope); + break; + } + case TOK_REPLACE: { + setOldReplPolicy(currNode); break; } case TOK_FROM: { @@ -247,6 +282,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { Task replDumpWorkTask = TaskFactory .get(new ReplDumpWork( replScope, + oldReplScope, eventFrom, eventTo, ErrorMsg.INVALID_PATH.getMsg(ast), @@ -413,6 +449,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { LOG.debug("{} contains an bootstrap dump", loadPath); } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(), + dmd.getReplScope(), queryState.getLineageState(), evDump, dmd.getEventTo(), dirLocationsToCopy(loadPath, evDump)); rootTasks.add(TaskFactory.get(replLoadWork, conf)); @@ -511,6 +548,6 @@ private void prepareReturnValues(List values, String schema) throws Sema LOG.debug(" > " + s); } ctx.setResFile(ctx.getLocalTmpPath()); - Utils.writeOutput(values, ctx.getResFile(), conf); + Utils.writeOutput(Collections.singletonList(values), ctx.getResFile(), conf); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index c2e26f0710..4500fb63e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -169,7 +169,7 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } private boolean shouldExport() { - return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, false, conf); + return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, false, null, conf); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index ce5ef06830..8a259a8734 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -59,18 +59,20 @@ IDLE, ACTIVE } - public static void writeOutput(List values, Path outputFile, HiveConf hiveConf) + public static void writeOutput(List> listValues, Path outputFile, HiveConf hiveConf) throws SemanticException { DataOutputStream outStream = null; try { FileSystem fs = outputFile.getFileSystem(hiveConf); outStream = fs.create(outputFile); - outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); - for (int i = 1; i < values.size(); i++) { - outStream.write(Utilities.tabCode); - outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i))); + for (List values : listValues) { + outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); + for (int i = 1; i < values.size(); i++) { + outStream.write(Utilities.tabCode); + outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i))); + } + outStream.write(Utilities.newLineCode); } - outStream.write(Utilities.newLineCode); } catch (IOException e) { throw new SemanticException(e); } finally { @@ -175,7 +177,7 @@ public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) thro * specific checks. */ public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, - boolean isEventDump, HiveConf hiveConf) { + boolean isEventDump, ReplScope oldReplScope, HiveConf hiveConf) { if (replicationSpec == null) { replicationSpec = new ReplicationSpec(); } @@ -194,6 +196,15 @@ public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tab return false; } + // If replication policy is replaced with new included/excluded tables list, then events + // corresponding to tables which are not included in old policy but included in new policy + // should be skipped. Those tables would be bootstrapped along with the current incremental + // replication dump. + // Note: If any event dump reaches here, it means, table is included in new replication policy. + if (isEventDump && !ReplUtils.tableIncludedInReplScope(oldReplScope, tableHandle.getTableName())) { + return false; + } + if (MetaStoreUtils.isExternalTable(tableHandle.getTTable())) { boolean shouldReplicateExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) || replicationSpec.isMetadataOnly(); @@ -220,7 +231,9 @@ public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tab } public static boolean shouldReplicate(NotificationEvent tableForEvent, - ReplicationSpec replicationSpec, Hive db, boolean isEventDump, HiveConf hiveConf) { + ReplicationSpec replicationSpec, Hive db, + boolean isEventDump, ReplScope oldReplScope, + HiveConf hiveConf) { Table table; try { table = db.getTable(tableForEvent.getDbName(), tableForEvent.getTableName()); @@ -230,7 +243,7 @@ public static boolean shouldReplicate(NotificationEvent tableForEvent, .getTableName(), e); return false; } - return shouldReplicate(replicationSpec, table, isEventDump, hiveConf); + return shouldReplicate(replicationSpec, table, isEventDump, oldReplScope, hiveConf); } static List getDataPathList(Path fromPath, ReplicationSpec replicationSpec, HiveConf conf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java index d938cc15a6..be6574d562 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java @@ -32,6 +32,7 @@ boolean shouldReplicate(Context withinContext) { withinContext.replicationSpec, withinContext.db, true, + withinContext.oldReplScope, withinContext.hiveConf ); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 0756f59a81..b6d2d61616 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -67,7 +67,8 @@ public void handle(Context withinContext) throws Exception { } final Table qlMdTable = new Table(tobj); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, + true, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java index bd25a6c88d..a9401021d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java @@ -49,6 +49,15 @@ public void handle(Context withinContext) throws Exception { return; } + // If replication policy is replaced with new included/excluded tables list, then events + // corresponding to tables which are not included in old policy but included in new policy + // should be skipped. Those tables would be bootstrapped along with the current incremental + // replication dump. + // Note: If any event dump reaches here, it means, it is included in new replication policy. + if (!ReplUtils.tableIncludedInReplScope(withinContext.oldReplScope, eventMessage.getTableName())) { + return; + } + DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); dmd.write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index e59bdf67f8..7d6599b768 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -100,7 +100,8 @@ public void handle(Context withinContext) throws Exception { } Table qlMdTable = new Table(tableObject); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, + true, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index f02cbb885f..89bd0bfbca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -88,7 +88,8 @@ public void handle(Context withinContext) throws Exception { Table qlMdTableBefore = new Table(before); if (!Utils - .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, true, withinContext.hiveConf)) { + .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, + true, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 995d634ed9..a13fe38140 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -113,7 +113,13 @@ private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hiv : new ArrayList<>(Collections2.filter(writeEventInfoList, writeEventInfo -> { assert(writeEventInfo != null); - return withinContext.replScope.tableIncludedInReplScope(writeEventInfo.getTable()); + // If replication policy is replaced with new included/excluded tables list, then events + // corresponding to tables which are included in both old and new policies should be dumped. + // If table is included in new policy but not in old policy, then it should be skipped. + // Those tables would be bootstrapped along with the current incremental + // replication dump. + return (ReplUtils.tableIncludedInReplScope(withinContext.replScope, writeEventInfo.getTable()) + && ReplUtils.tableIncludedInReplScope(withinContext.oldReplScope, writeEventInfo.getTable())); }))); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 8a838db508..be7f7ebdd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -54,7 +54,8 @@ public void handle(Context withinContext) throws Exception { Table qlMdTable = new Table(tobj); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, + true, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index 0d60c3136d..b75d12b357 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -41,15 +41,17 @@ final HiveConf hiveConf; final ReplicationSpec replicationSpec; final ReplScope replScope; + final ReplScope oldReplScope; public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, - ReplicationSpec replicationSpec, ReplScope replScope) { + ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope) { this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; this.hiveConf = hiveConf; this.replicationSpec = replicationSpec; this.replScope = replScope; + this.oldReplScope = oldReplScope; } public Context(Context other) { @@ -59,6 +61,7 @@ public Context(Context other) { this.hiveConf = other.hiveConf; this.replicationSpec = other.replicationSpec; this.replScope = other.replScope; + this.oldReplScope = other.oldReplScope; } void setEventRoot(Path eventRoot) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 1bcd52923b..116db5bb69 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -58,7 +58,8 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec.setNoop(true); } - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, + true, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java index 54fc7a66be..23d088d744 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java @@ -52,8 +52,8 @@ public void handle(Context withinContext) throws Exception { return; } - if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), true, - withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), + true, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java index 62db959a2e..ab314edef8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java @@ -37,7 +37,8 @@ UpdateTableColumnStatMessage eventMessage(String stringRepresentation) { public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} UpdateTableColumnStat message : {}", fromEventId(), eventMessageAsJSON); Table qlMdTable = new Table(eventMessage.getTableObject()); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, + true, withinContext.oldReplScope, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 552183af5b..bef4780eb0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -52,7 +52,7 @@ public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle, @Override public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { - if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, false, hiveConf)) { + if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, false, null, hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index 974e105341..4c137eb365 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -19,32 +19,38 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class DumpMetaData { // wrapper class for reading and writing metadata about a dump // responsible for _dumpmetadata files private static final String DUMP_METADATA = "_dumpmetadata"; + private static final Logger LOG = LoggerFactory.getLogger(DumpMetaData.class); private DumpType dumpType; private Long eventFrom = null; private Long eventTo = null; + private Path cmRoot; private String payload = null; - private boolean initialized = false; + private ReplScope replScope = null; + private boolean initialized = false; private final Path dumpFile; private final HiveConf hiveConf; - private Path cmRoot; public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -61,8 +67,66 @@ public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot) { this.dumpType = lvl; this.eventFrom = eventFrom; this.eventTo = eventTo; - this.initialized = true; this.cmRoot = cmRoot; + this.initialized = true; + } + + public void setPayload(String payload) { + this.payload = payload; + } + + public void setReplScope(ReplScope replScope) { + this.replScope = replScope; + } + + private void readReplScope(String line) throws IOException { + if (line == null) { + return; + } + + String[] lineContents = line.split("\t"); + if (lineContents.length < 1) { + return; + } + + replScope = new ReplScope(); + + LOG.info("Read ReplScope: Set Db Name: {}.", lineContents[0]); + replScope.setDbName(lineContents[0]); + + // Read/set include and exclude tables list. + int idx = readReplScopeTablesList(lineContents, 1, true); + readReplScopeTablesList(lineContents, idx, false); + } + + private int readReplScopeTablesList(String[] lineContents, int startIdx, boolean includeList) + throws IOException { + // If the list doesn't exist, then return. + if (startIdx >= lineContents.length) { + return startIdx; + } + + // Each tables list should start with "{" and ends with "}" + if (!"{".equals(lineContents[startIdx])) { + throw new IOException("Invalid repl tables list data in dump metadata file. Missing \"{\"."); + } + + ListtableNames = new ArrayList<>(); + for (int i = (startIdx + 1); i < lineContents.length; i++) { + String value = lineContents[i]; + if ("}".equals(value)) { + if (includeList) { + LOG.info("Read ReplScope: Set Include Table Names: {}.", tableNames); + replScope.setIncludedTablePatterns(tableNames); + } else { + LOG.info("Read ReplScope: Set Exclude Table Names: {}.", tableNames); + replScope.setExcludedTablePatterns(tableNames); + } + return (i + 1); + } + tableNames.add(value); + } + throw new IOException("Invalid repl tables list data in dump metadata file. Missing \"}\"."); } private void loadDumpFromFile() throws SemanticException { @@ -71,7 +135,7 @@ private void loadDumpFromFile() throws SemanticException { // read from dumpfile and instantiate self FileSystem fs = dumpFile.getFileSystem(hiveConf); br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); - String line = null; + String line; if ((line = br.readLine()) != null) { String[] lineContents = line.split("\t", 5); setDump(DumpType.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), @@ -82,6 +146,7 @@ private void loadDumpFromFile() throws SemanticException { throw new IOException( "Unable to read valid values from dumpFile:" + dumpFile.toUri().toString()); } + readReplScope(br.readLine()); } catch (IOException ioe) { throw new SemanticException(ioe); } finally { @@ -105,10 +170,6 @@ public String getPayload() throws SemanticException { return this.payload; } - public void setPayload(String payload) { - this.payload = payload; - } - public Long getEventFrom() throws SemanticException { initializeIfNot(); return eventFrom; @@ -119,6 +180,10 @@ public Long getEventTo() throws SemanticException { return eventTo; } + public ReplScope getReplScope() throws SemanticException { + initializeIfNot(); + return replScope; + } public Path getDumpFilePath() { return dumpFile; } @@ -134,17 +199,42 @@ private void initializeIfNot() throws SemanticException { } } + private List prepareReplScopeValues() { + assert(replScope != null); + + List values = new ArrayList<>(); + values.add(replScope.getDbName()); + + List includedTableNames = replScope.getIncludedTableNames(); + List excludedTableNames = replScope.getExcludedTableNames(); + if (includedTableNames != null) { + values.add("{"); + values.addAll(includedTableNames); + values.add("}"); + } + if (excludedTableNames != null) { + values.add("{"); + values.addAll(excludedTableNames); + values.add("}"); + } + LOG.info("Preparing ReplScope {} to dump.", values); + return values; + } public void write() throws SemanticException { - Utils.writeOutput( + List> listValues = new ArrayList<>(); + listValues.add( Arrays.asList( dumpType.toString(), eventFrom.toString(), eventTo.toString(), cmRoot.toString(), - payload), - dumpFile, - hiveConf + payload) + ); + if (replScope != null) { + listValues.add(prepareReplScopeValues()); + } + Utils.writeOutput(listValues, dumpFile, hiveConf ); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index a412d435fd..aacd29591d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -139,7 +139,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, task.initialize(queryState, null, null, null); task.setWork( - new ReplDumpWork(replScope, + new ReplDumpWork(replScope, null, Long.MAX_VALUE, Long.MAX_VALUE, "", Integer.MAX_VALUE, "") ); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java index b8c95d5e9b..14b58a3f51 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java @@ -29,8 +29,12 @@ public class ReplScope implements Serializable { private String dbName; private Pattern dbNamePattern; - private List includedTableNamePatterns; // Only for REPL DUMP and exist only if tableName == null. - private List excludedTableNamePatterns; // Only for REPL DUMP and exist only if tableName == null. + + // Include and exclude table names/patterns exist only for REPL DUMP. + private List includedTableNames; + private List excludedTableNames; + private List includedTableNamePatterns; + private List excludedTableNamePatterns; public ReplScope() { } @@ -49,12 +53,22 @@ public String getDbName() { return dbName; } - public void setIncludedTablePatterns(List includedTableNamePatterns) { - this.includedTableNamePatterns = compilePatterns(includedTableNamePatterns); + public void setIncludedTablePatterns(List includedTableNames) { + this.includedTableNames = includedTableNames; + this.includedTableNamePatterns = compilePatterns(includedTableNames); + } + + public List getIncludedTableNames() { + return includedTableNames; + } + + public void setExcludedTablePatterns(List excludedTableNames) { + this.excludedTableNames = excludedTableNames; + this.excludedTableNamePatterns = compilePatterns(excludedTableNames); } - public void setExcludedTablePatterns(List excludedTableNamePatterns) { - this.excludedTableNamePatterns = compilePatterns(excludedTableNamePatterns); + public List getExcludedTableNames() { + return excludedTableNames; } public boolean includeAllTables() {