diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b3a4754..d3cfca1 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -489,7 +489,15 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Indicates if repl dump should include information about external tables. It should be \n" + "used in conjunction with 'hive.repl.dump.metadata.only' set to false. if 'hive.repl.dump.metadata.only' \n" + " is set to true then this config parameter has no effect as external table meta data is flushed \n" - + " always by default."), + + " always by default. If this config parameter is enabled on an on-going replication policy which is in\n" + + " incremental phase, then need to set 'hive.repl.bootstrap.external.tables' to true for the first \n" + + " repl dump to bootstrap all external tables."), + REPL_BOOTSTRAP_EXTERNAL_TABLES("hive.repl.bootstrap.external.tables", false, + "Indicates if repl dump should bootstrap the information about external tables during incremental \n" + + "phase of replication. It should be used in conjunction with 'hive.repl.include.external.tables' \n" + + "set to true. If 'hive.repl.include.external.tables' is set to false, then this config parameter \n" + + "has no effect. This config parameter should be set to true only once for repl dump after enabling \n" + + "'hive.repl.include.external.tables' to true."), REPL_ENABLE_MOVE_OPTIMIZATION("hive.repl.enable.move.optimization", false, "If its set to true, REPL LOAD copies data files directly to the target table/partition location \n" + "instead of copying to staging directory first and then move to target location. This optimizes \n" diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 0e3cefc..cc71cef 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -46,6 +46,7 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -387,6 +388,97 @@ public void externalTableIncrementalReplication() throws Throwable { } + @Test + public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { + List loadWithClause = externalTableBasePathWithClause(); + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" + ); + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, null, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1" }) + .run("show tables like 't2'") + .verifyFailure(new String[] {"t2" }); + + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + tuple = primary.run("use " + primaryDbName) + .run("drop table t1") + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("insert into table t3 values (20)") + .run("create table t4 as select * from t3") + .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + + // the _external_tables_file info should be created as external tables are to be replicated. + assertTrue(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t2", "t3"), + new Path(tuple.dumpLocation, FILE_NAME)); + + // _bootstrap directory should be created as bootstrap enabled on external tables. + Path dumpPath = new Path(tuple.dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); + + // _bootstrap//t2 + // _bootstrap//t3 + Path dbPath = new Path(dumpPath, primaryDbName); + Path tblPath = new Path(dbPath, "t2"); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); + tblPath = new Path(dbPath, "t3"); + assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .status(replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] {"t1" }) + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4"); + + // Drop source tables to see if target points to correct data or not after bootstrap load. + primary.run("use " + primaryDbName) + .run("drop table t2") + .run("drop table t3"); + + // Create table event for t4 should be applied along with bootstrapping of t2 and t3 + replica.run("use " + replicatedDbName) + .run("select place from t2 where country = 'us'") + .verifyResult("austin") + .run("select place from t2 where country = 'france'") + .verifyResult("paris") + .run("select id from t3 order by id") + .verifyResults(Arrays.asList("10", "20")) + .run("select id from t4 order by id") + .verifyResults(Arrays.asList("10", "20")); + } + private List externalTableBasePathWithClause() throws IOException, SemanticException { Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE); DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java index 5529d9e..a4b044d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java @@ -81,6 +81,7 @@ static void internalBeforeClassSetup(Map overrides, Class clazz) put("hive.mapred.mode", "nonstrict"); put("mapred.input.dir.recursive", "true"); put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.stats.autogather", "false"); }}; acidConfs.putAll(overrides); @@ -91,6 +92,7 @@ static void internalBeforeClassSetup(Map overrides, Class clazz) put("hive.support.concurrency", "false"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); put("hive.metastore.client.capability.check", "false"); + put("hive.stats.autogather", "false"); }}; replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 01ecf0a..0dd50de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; @@ -120,7 +121,6 @@ import org.apache.hadoop.hive.ql.parse.ParseException; import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId; @@ -927,8 +927,8 @@ private void setLastReplIdForDump(HiveConf conf) throws HiveException, TExceptio // Last logged notification event id would be the last repl Id for the current REPl DUMP. Hive hiveDb = Hive.get(); Long lastReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); - conf.setLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, lastReplId); - LOG.debug("Setting " + ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY + " = " + lastReplId); + conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId); + LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId); } private void openTransaction() throws LockException, CommandProcessorResponse { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 947bfcf..acfa354 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -47,7 +48,6 @@ import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -80,15 +80,13 @@ public class ReplDumpTask extends Task implements Serializable { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; - private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; - private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; - private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; + private static final String FUNCTION_METADATA_FILE_NAME = EximUtil.METADATA_NAME; private static final long SLEEP_TIME = 60000; public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_"); private final String name; private final String prefix; - private ConstraintFileType(String name, String prefix) { + ConstraintFileType(String name, String prefix) { this.name = name; this.prefix = prefix; } @@ -195,13 +193,23 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot); dmd.write(); - if (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) && - !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + // If external tables are enabled for replication and + // - If bootstrap is enabled, then need to combine bootstrap dump of external tables. + // - If metadata-only dump is enabled, then shall skip dumping external tables data locations to + // _external_tables_info file. If not metadata-only, then dump the data locations. + if (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) + && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) + || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES))) { + Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); try (Writer writer = new Writer(dumpRoot, conf)) { for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { Table table = hiveDb.getTable(dbName, tableName); if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { writer.dataLocationDump(table); + if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)) { + HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); + dumpTable(dbName, tableName, null, dbRoot, 0, hiveDb, tableTuple); + } } } } @@ -209,6 +217,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive return lastReplId; } + private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncrementalPhase) { + if (isIncrementalPhase) { + dumpRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + } + return new Path(dumpRoot, dbName); + } + private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, @@ -237,7 +252,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th // Last repl id would've been captured during compile phase in queryState configs before opening txn. // This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from // concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore. - Long bootDumpBeginReplId = queryState.getConf().getLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, -1L); + Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0L); String validTxnList = getValidTxnListForReplDump(hiveDb); @@ -252,6 +267,9 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; + boolean shouldWriteExternalTableLocationInfo = + conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) + && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); try (Writer writer = new Writer(dbRoot, conf)) { for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { LOG.debug( @@ -259,12 +277,9 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th try { HiveWrapper.Tuple
tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf); - boolean shouldWriteExternalTableLocationInfo = - conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) - && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType()) - && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); - if (shouldWriteExternalTableLocationInfo) { - LOG.debug("adding table {} to external tables list", tblName); + if (shouldWriteExternalTableLocationInfo + && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { + LOG.debug("Adding table {} to external tables list", tblName); writer.dataLocationDump(tableTuple.object); } dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, @@ -312,7 +327,7 @@ long currentNotificationId(Hive hiveDb) throws TException { } Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) throws Exception { - Path dbRoot = new Path(dumpRoot, dbName); + Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, false); // TODO : instantiating FS objects are generally costly. Refactor FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); @@ -435,7 +450,7 @@ private String getNextDumpDir() { } void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception { - Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); + Path functionsRoot = new Path(new Path(dumpRoot, dbName), ReplUtils.FUNCTIONS_ROOT_DIR_NAME); List functionNames = hiveDb.getFunctions(dbName, "*"); for (String functionName : functionNames) { HiveWrapper.Tuple tuple = functionTuple(functionName, dbName, hiveDb); @@ -455,7 +470,7 @@ void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exce void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) throws Exception { try { - Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME); + Path constraintsRoot = new Path(dbRoot, ReplUtils.CONSTRAINTS_ROOT_DIR_NAME); Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName); Path fkConstraintsFile = new Path(constraintsRoot, ConstraintFileType.FOREIGNKEY.getPrefix() + tblName); List pks = hiveDb.getPrimaryKeyList(dbName, tblName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index 012df9d..9fee1c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -76,13 +76,14 @@ public static String externalTableLocation(HiveConf hiveConf, String location) { private static Logger LOG = LoggerFactory.getLogger(Writer.class); private final HiveConf hiveConf; private final Path writePath; - private final Boolean excludeExternalTables, dumpMetadataOnly; + private final boolean includeExternalTables; + private final boolean dumpMetadataOnly; private OutputStream writer; Writer(Path dbRoot, HiveConf hiveConf) throws IOException { this.hiveConf = hiveConf; writePath = new Path(dbRoot, FILE_NAME); - excludeExternalTables = !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); + includeExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); if (shouldWrite()) { this.writer = FileSystem.get(hiveConf).create(writePath); @@ -90,7 +91,7 @@ public static String externalTableLocation(HiveConf hiveConf, String location) { } private boolean shouldWrite() { - return !dumpMetadataOnly && !excludeExternalTables; + return !dumpMetadataOnly && includeExternalTables; } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 2126aab..4dc14f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -49,7 +50,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; @@ -61,6 +64,11 @@ public String getName() { return (work.isIncrementalLoad() ? "REPL_INCREMENTAL_LOAD" : "REPL_BOOTSTRAP_LOAD"); } + @Override + public StageType getType() { + return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD; + } + /** * Provides the root Tasks created as a result of this loadTask run which will be executed * by the driver. It does not track details across multiple runs of LoadTask. @@ -96,8 +104,8 @@ private int executeBootStrapLoad(DriverContext driverContext) { of multiple databases once we have the basic flow to chain creating of tasks in place for a database ( directory ) */ - BootstrapEventsIterator iterator = work.iterator(); - ConstraintEventsIterator constraintIterator = work.constraintIterator(); + BootstrapEventsIterator iterator = work.bootstrapIterator(); + ConstraintEventsIterator constraintIterator = work.constraintsIterator(); /* This is used to get hold of a reference during the current creation of tasks and is initialized with "0" tasks such that it will be non consequential in any operations done with task tracker @@ -241,7 +249,11 @@ a database ( directory ) if (addAnotherLoadTask) { createBuilderTask(scope.rootTasks); } - if (!iterator.hasNext() && !constraintIterator.hasNext()) { + + // Update last repl ID of the database only if the current dump is not incremental. If bootstrap + // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change + // last repl ID of the database. + if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); work.updateDbEventState(null); } @@ -268,9 +280,17 @@ a database ( directory ) } private void createEndReplLogTask(Context context, Scope scope, - ReplLogger replLogger) throws SemanticException { - Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbInMetadata.getParameters()); + ReplLogger replLogger) throws SemanticException { + Map dbProps; + if (work.isIncrementalLoad()) { + dbProps = new HashMap<>(); + dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), + work.incrementalLoadTasksBuilder().eventTo().toString()); + } else { + Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); + dbProps = dbInMetadata.getParameters(); + } + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps); Task replLogTask = TaskFactory.get(replLogWork); if (scope.rootTasks.isEmpty()) { scope.rootTasks.add(replLogTask); @@ -344,20 +364,25 @@ private void createBuilderTask(List> rootTasks) { DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask)); } - @Override - public StageType getType() { - return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD; - } - private int executeIncrementalLoad(DriverContext driverContext) { try { + IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); + + // If incremental events are already applied, then check and perform if need to bootstrap any tables. + if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { + if (work.hasBootstrapLoadTasks()) { + LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " + + "mode after applying all events."); + return executeBootStrapLoad(driverContext); + } + } + List> childTasks = new ArrayList<>(); int parallelism = conf.getIntVar(HiveConf.ConfVars.EXECPARALLETHREADNUMBER); // during incremental we will have no parallelism from replication tasks since they are event based - // and hence are linear. To achieve prallelism we have to use copy tasks(which have no DAG) for + // and hence are linear. To achieve parallelism we have to use copy tasks(which have no DAG) for // all threads except one, in execution phase. int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - IncrementalLoadTasksBuilder builder = work.getIncrementalLoadTaskBuilder(); // If the total number of tasks that can be created are less than the parallelism we can achieve // do nothing since someone is working on 1950's machine. else try to achieve max parallelism @@ -374,17 +399,18 @@ private int executeIncrementalLoad(DriverContext driverContext) { } TaskTracker trackerForReplIncremental = new TaskTracker(calculatedMaxNumOfTasks); Task incrementalLoadTaskRoot = - builder.build(driverContext, getHive(), LOG, work, trackerForReplIncremental); + builder.build(driverContext, getHive(), LOG, trackerForReplIncremental); // we are adding the incremental task first so that its always processed first, // followed by dir copy tasks if capacity allows. childTasks.add(incrementalLoadTaskRoot); TaskTracker trackerForCopy = new TaskTracker(maxNumOfHDFSTasks); - childTasks - .addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy)); + childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy)); - // either the incremental has more work or the external table file copy has more paths to process - if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext()) { + // Either the incremental has more work or the external table file copy has more paths to process. + // Once all the incremental events are applied and external tables file copies are done, enable + // bootstrap of tables if exist. + if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext() || work.hasBootstrapLoadTasks()) { DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); } this.childTasks = childTasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index e86a5fa..7539281 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -17,12 +17,15 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.exec.Task; @@ -44,8 +47,7 @@ private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; private final transient BootstrapEventsIterator bootstrapIterator; - private final transient IncrementalLoadEventsIterator incrementalIterator; - private final transient IncrementalLoadTasksBuilder incrementalLoad; + private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder; private transient Task rootTask; private final transient Iterator pathsToCopyIterator; @@ -65,26 +67,36 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad this.dbNameToLoadIn = dbNameToLoadIn; rootTask = null; if (isIncrementalDump) { - incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, hiveConf); - this.bootstrapIterator = null; - this.constraintsIterator = null; - incrementalLoad = + incrementalLoadTasksBuilder = new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory, - incrementalIterator, hiveConf, eventTo); + new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo); + + /* + * If the current incremental dump also includes bootstrap for some tables, then create iterator + * for the same. + */ + Path incBootstrapDir = new Path(dumpDirectory, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + FileSystem fs = incBootstrapDir.getFileSystem(hiveConf); + if (fs.exists(incBootstrapDir)) { + this.bootstrapIterator = new BootstrapEventsIterator(incBootstrapDir.toString(), dbNameToLoadIn, hiveConf); + this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); + } else { + this.bootstrapIterator = null; + this.constraintsIterator = null; + } } else { this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); - incrementalIterator = null; - incrementalLoad = null; + incrementalLoadTasksBuilder = null; } this.pathsToCopyIterator = pathsToCopyIterator.iterator(); } - public BootstrapEventsIterator iterator() { + BootstrapEventsIterator bootstrapIterator() { return bootstrapIterator; } - public ConstraintEventsIterator constraintIterator() { + ConstraintEventsIterator constraintsIterator() { return constraintsIterator; } @@ -104,16 +116,17 @@ boolean hasDbState() { return state != null; } - public boolean isIncrementalLoad() { - return incrementalIterator != null; + boolean isIncrementalLoad() { + return incrementalLoadTasksBuilder != null; } - public IncrementalLoadEventsIterator getIncrementalIterator() { - return incrementalIterator; + boolean hasBootstrapLoadTasks() { + return (((bootstrapIterator != null) && bootstrapIterator.hasNext()) + || ((constraintsIterator != null) && constraintsIterator.hasNext())); } - public IncrementalLoadTasksBuilder getIncrementalLoadTaskBuilder() { - return incrementalLoad; + IncrementalLoadTasksBuilder incrementalLoadTasksBuilder() { + return incrementalLoadTasksBuilder; } public Task getRootTask() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index 60ad6d3..ef6e31f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -22,8 +22,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.repl.load.log.BootstrapLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; @@ -93,14 +93,9 @@ public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, Hive + " does not correspond to REPL LOAD expecting to load to a singular destination point."); } - List dbsToCreate = Arrays.stream(fileStatuses).filter(f -> { - Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME); - try { - return fileSystem.exists(metadataPath); - } catch (IOException e) { - throw new RuntimeException("could not determine if exists : " + metadataPath.toString(), e); - } - }).collect(Collectors.toList()); + List dbsToCreate = Arrays.stream(fileStatuses).filter( + f -> !f.getPath().getName().equals(ReplUtils.CONSTRAINTS_ROOT_DIR_NAME) + ).collect(Collectors.toList()); dbEventsIterator = dbsToCreate.stream().map(f -> { try { return new DatabaseEventsIterator(f.getPath(), hiveConf); @@ -167,7 +162,7 @@ private void initReplLogger() { long numTables = getSubDirs(fs, dbDumpPath).length; long numFunctions = 0; - Path funcPath = new Path(dbDumpPath, ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME); + Path funcPath = new Path(dbDumpPath, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); if (fs.exists(funcPath)) { numFunctions = getSubDirs(fs, funcPath).length; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java index 32518e0..73c3a7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -28,8 +28,8 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask.ConstraintFileType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; public class ConstraintEventsIterator implements Iterator { private FileStatus[] dbDirs; @@ -47,7 +47,7 @@ public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) throws private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir, String prefix) { try { - return fs.listStatus(new Path(dbDir, ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME), new PathFilter() { + return fs.listStatus(new Path(dbDir, ReplUtils.CONSTRAINTS_ROOT_DIR_NAME), new PathFilter() { public boolean accept(Path p) { return p.getName().startsWith(prefix); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index e0f8f72..874edb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -39,7 +39,7 @@ import java.util.stream.Collectors; import java.util.ArrayList; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.FUNCTIONS_ROOT_DIR_NAME; class DatabaseEventsIterator implements Iterator { private static Logger LOG = LoggerFactory.getLogger(DatabaseEventsIterator.class); @@ -122,7 +122,7 @@ public boolean hasNext() { while (remoteIterator.hasNext()) { LocatedFileStatus next = remoteIterator.next(); // we want to skip this file, this also means there cant be a table with name represented - // by constantReplExternalTables.FILE_NAME + // by constant ReplExternalTables.FILE_NAME if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME)) { continue; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 076165a..99a8d5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -123,9 +123,6 @@ public ImportTableDesc tableDesc(String dbName) throws SemanticException { tableDesc.setExternal(true); } tableDesc.setReplicationSpec(replicationSpec()); - if (table.getTableType() == TableType.EXTERNAL_TABLE) { - tableDesc.setExternal(true); - } return tableDesc; } catch (Exception e) { throw new SemanticException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index ad41276..65b7aa0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -159,13 +159,6 @@ private ReplicationState initialReplicationState() throws SemanticException { } private TaskTracker forNewTable() throws Exception { - Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); - // If table doesn't exist, allow creating a new one only if the database state is older than the update. - // This in-turn applicable for partitions creation as well. - if ((parentDb != null) && (!event.replicationSpec().allowReplacementInto(parentDb.getParameters()))) { - return tracker; - } - Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); while (iterator.hasNext() && tracker.canAddMoreTasks()) { AddPartitionDesc currentPartitionDesc = iterator.next(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java index 5638ace..f2c8e8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import java.io.IOException; @@ -42,7 +42,7 @@ public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOException { Path eventPath = new Path(loadPath); FileSystem fs = eventPath.getFileSystem(conf); - eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs)); + eventDirs = fs.listStatus(eventPath, ReplUtils.getEventsDirectoryFilter(fs)); if ((eventDirs == null) || (eventDirs.length == 0)) { currentIndex = 0; numEvents = 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 7ae33e3..e065b41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -92,7 +92,7 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP } public Task build(DriverContext driverContext, Hive hive, Logger log, - ReplLoadWork loadWork, TaskTracker tracker) throws Exception { + TaskTracker tracker) throws Exception { Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork()); Task taskChainTail = evTaskRoot; Long lastReplayedEvent = null; @@ -173,9 +173,6 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP this.log.debug("Added {}:{} as a precursor of barrier task {}:{}", taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); - if (loadWork.getPathsToCopyIterator().hasNext()) { - taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf)); - } } return evTaskRoot; } @@ -396,6 +393,10 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa return tasks; } + public Long eventTo() { + return eventTo; + } + public static long getNumIteration() { return numIteration; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 4fdd12a..91eeb13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.repl.util; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -56,12 +58,19 @@ public class ReplUtils { + public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id"; public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key"; // write id allocated in the current execution context which will be passed through config to be used by different // tasks. public static final String REPL_CURRENT_TBL_WRITE_ID = "hive.repl.current.table.write.id"; + public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; + + // Root directory for dumping bootstrapped tables along with incremental events dump. + public static final String INC_BOOTSTRAP_ROOT_DIR_NAME = "_bootstrap"; + // Migrating to transactional tables in bootstrap load phase. // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1. public static final Long REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID = 1L; @@ -167,4 +176,15 @@ public static boolean replCkptStatus(String dbName, Map props, S } return taskList; } + + // Path filters to filter only events (directories) excluding "_bootstrap" + public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { + return p -> { + try { + return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 4e7595c..2036d69 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -85,10 +85,6 @@ private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; - public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id"; - public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; - public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; - ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); this.db = super.db; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java index 2fa3676..d01e24c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java @@ -62,6 +62,10 @@ public HiveWrapper(Hive db, String dbName, long lastReplId) { getColStats)); } + public Tuple
table(final Table tblObj) throws HiveException { + return new Tuple<>(functionForSpec, () -> tblObj); + } + public static class Tuple { interface Function { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 1eee3fd..4cd4d70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -178,7 +178,7 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } private boolean shouldExport() { - return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, conf); + return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, false, conf); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 21df63c..3cac813 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -166,8 +166,8 @@ public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) thro * validates if a table can be exported, similar to EximUtil.shouldExport with few replication * specific checks. */ - public static Boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, - HiveConf hiveConf) { + public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, + boolean isEventDump, HiveConf hiveConf) { if (replicationSpec == null) { replicationSpec = new ReplicationSpec(); } @@ -187,14 +187,21 @@ public static Boolean shouldReplicate(ReplicationSpec replicationSpec, Table tab } if (MetaStoreUtils.isExternalTable(tableHandle.getTTable())) { - return hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) || replicationSpec.isMetadataOnly(); + boolean shouldReplicateExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) + || replicationSpec.isMetadataOnly(); + if (isEventDump) { + // Skip dumping of events related to external tables if bootstrap is enabled on it. + shouldReplicateExternalTables = shouldReplicateExternalTables + && !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES); + } + return shouldReplicateExternalTables; } } return true; } public static boolean shouldReplicate(NotificationEvent tableForEvent, - ReplicationSpec replicationSpec, Hive db, HiveConf hiveConf) { + ReplicationSpec replicationSpec, Hive db, boolean isEventDump, HiveConf hiveConf) { Table table; try { table = db.getTable(tableForEvent.getDbName(), tableForEvent.getTableName()); @@ -204,7 +211,7 @@ public static boolean shouldReplicate(NotificationEvent tableForEvent, .getTableName(), e); return false; } - return shouldReplicate(replicationSpec, table, hiveConf); + return shouldReplicate(replicationSpec, table, isEventDump, hiveConf); } static List getDataPathList(Path fromPath, ReplicationSpec replicationSpec, HiveConf conf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java index 672f402..d938cc1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java @@ -31,6 +31,7 @@ boolean shouldReplicate(Context withinContext) { event, withinContext.replicationSpec, withinContext.db, + true, withinContext.hiveConf ); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 415e954..0756f59 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -67,7 +67,7 @@ public void handle(Context withinContext) throws Exception { } final Table qlMdTable = new Table(tobj); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index 1b91e3e..e59bdf6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -100,7 +100,7 @@ public void handle(Context withinContext) throws Exception { } Table qlMdTable = new Table(tableObject); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index ff43399..4deb551 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -87,7 +87,7 @@ public void handle(Context withinContext) throws Exception { Table qlMdTableBefore = new Table(before); if (!Utils - .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, withinContext.hiveConf)) { + .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, true, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index a8bf671..8a838db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -54,7 +54,7 @@ public void handle(Context withinContext) throws Exception { Table qlMdTable = new Table(tobj); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index f029fee..1bcd529 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -58,7 +58,7 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec.setNoop(true); } - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java index f3f00c5..79e1361 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java @@ -59,7 +59,7 @@ public void handle(Context withinContext) throws Exception { return; } - if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), + if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), true, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java index bd8182d..ca9af5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java @@ -38,7 +38,7 @@ UpdateTableColumnStatMessage eventMessage(String stringRepresentation) { public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} UpdateTableColumnStat message : {}", fromEventId(), eventMessageAsJSON); Table qlMdTable = new Table(eventMessage.getTableObject()); - if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, withinContext.hiveConf)) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index dac20d2..4d8ffe9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -52,7 +52,7 @@ public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle, @Override public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { - if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, hiveConf)) { + if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, false, hiveConf)) { return; }