diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 9a2d296c05..5bdc88f540 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -341,7 +341,7 @@ public void testTxnEventNonAcid() throws Throwable { WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); - replicaNonAcid.loadFailure(replicatedDbName, incrementalDump.dumpLocation) + replicaNonAcid.runFailure("REPL LOAD " + replicatedDbName + " FROM '" + incrementalDump.dumpLocation + "'") .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 35437b14da..63522c5841 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -820,6 +820,50 @@ public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable { verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation); } + @Test + public void testIncrementalDumpMultiIteration() throws Throwable { + WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null); + + replica.load(replicatedDbName, bootstrapTuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(bootstrapTuple.lastReplicationId); + + WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName) + .run("create table table1 (id int) partitioned by (country string)") + .run("create table table2 (id int)") + .run("create table table3 (id int) partitioned by (country string)") + .run("insert into table1 partition(country='india') values(1)") + .run("insert into table2 values(2)") + .run("insert into table3 partition(country='india') values(3)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='10'")) + .status(replicatedDbName) + .verifyResult(incremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResults(new String[] {"1" }) + .run("select * from table2") + .verifyResults(new String[] {"2" }) + .run("select id from table3") + .verifyResults(new String[] {"3" }); + + incremental = primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table4 (i int, j int)") + .run("insert into table4 values (1,2)") + .dump(primaryDbName, incremental.lastReplicationId, + Arrays.asList("'hive.repl.dump.include.acid.tables'='true'")); + + replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='1'")) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"acid_table", "table1", "table2", "table3", "table4" }) + .run("select i from table4") + .verifyResult("1"); + } + @Test public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable { WarehouseInstance.Tuple tuplePrimary = primary diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index ad778e3245..d43eed3153 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -103,7 +103,8 @@ enum StageType { REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, - REPL_TXN + REPL_TXN, + REPL_INCREMENTAL_LOAD } struct Stage { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index b6eb12ab13..73bbe3a377 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -119,7 +119,8 @@ int _kStageTypeValues[] = { StageType::REPL_DUMP, StageType::REPL_BOOTSTRAP_LOAD, StageType::REPL_STATE_LOG, - StageType::REPL_TXN + StageType::REPL_TXN, + StageType::REPL_INCREMENTAL_LOAD }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -137,9 +138,10 @@ const char* _kStageTypeNames[] = { "REPL_DUMP", "REPL_BOOTSTRAP_LOAD", "REPL_STATE_LOG", - "REPL_TXN" + "REPL_TXN", + "REPL_INCREMENTAL_LOAD" }; -const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(16, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(17, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index eb02107e64..04c749f1eb 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -97,7 +97,8 @@ struct StageType { REPL_DUMP = 12, REPL_BOOTSTRAP_LOAD = 13, REPL_STATE_LOG = 14, - REPL_TXN = 15 + REPL_TXN = 15, + REPL_INCREMENTAL_LOAD = 16 }; }; diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index 08822b3cc7..7eebe28732 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -27,7 +27,8 @@ REPL_DUMP(12), REPL_BOOTSTRAP_LOAD(13), REPL_STATE_LOG(14), - REPL_TXN(15); + REPL_TXN(15), + REPL_INCREMENTAL_LOAD(16); private final int value; @@ -80,6 +81,8 @@ public static StageType findByValue(int value) { return REPL_STATE_LOG; case 15: return REPL_TXN; + case 16: + return REPL_INCREMENTAL_LOAD; default: return null; } diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index df4e41db93..1a36d08f92 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -118,6 +118,7 @@ final class StageType { const REPL_BOOTSTRAP_LOAD = 13; const REPL_STATE_LOG = 14; const REPL_TXN = 15; + const REPL_INCREMENTAL_LOAD = 16; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -135,6 +136,7 @@ final class StageType { 13 => 'REPL_BOOTSTRAP_LOAD', 14 => 'REPL_STATE_LOG', 15 => 'REPL_TXN', + 16 => 'REPL_INCREMENTAL_LOAD', ); } diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 85d39fdbe1..c0a22044a7 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -164,6 +164,7 @@ class StageType: REPL_BOOTSTRAP_LOAD = 13 REPL_STATE_LOG = 14 REPL_TXN = 15 + REPL_INCREMENTAL_LOAD = 16 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -182,6 +183,7 @@ class StageType: 13: "REPL_BOOTSTRAP_LOAD", 14: "REPL_STATE_LOG", 15: "REPL_TXN", + 16: "REPL_INCREMENTAL_LOAD", } _NAMES_TO_VALUES = { @@ -201,6 +203,7 @@ class StageType: "REPL_BOOTSTRAP_LOAD": 13, "REPL_STATE_LOG": 14, "REPL_TXN": 15, + "REPL_INCREMENTAL_LOAD": 16, } diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index 6010f3d21e..61349a2191 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -76,8 +76,9 @@ module StageType REPL_BOOTSTRAP_LOAD = 13 REPL_STATE_LOG = 14 REPL_TXN = 15 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN]).freeze + REPL_INCREMENTAL_LOAD = 16 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD]).freeze end class Adjacency diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 3a107b7e81..47a802f4f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -29,8 +29,8 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java similarity index 90% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 97917f8df6..70ae049ee7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.exec.repl.bootstrap; +package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.IncrementalLoad; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; @@ -56,7 +57,7 @@ @Override public String getName() { - return "REPL_BOOTSTRAP_LOAD"; + return (work.isIncrementalLoad() ? "REPL_INCREMENTAL_LOAD" : "REPL_BOOTSTRAP_LOAD"); } /** @@ -70,6 +71,14 @@ public String getName() { @Override protected int execute(DriverContext driverContext) { + if (work.isIncrementalLoad()) { + return executeIncrementalLoad(driverContext); + } else { + return executeBootStrapLoad(driverContext); + } + } + + private int executeBootStrapLoad(DriverContext driverContext) { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); Context context = new Context(work.dumpDirectory, conf, getHive(), @@ -203,7 +212,9 @@ a database ( directory ) } boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() || constraintIterator.hasNext(); - createBuilderTask(scope.rootTasks, addAnotherLoadTask); + if (addAnotherLoadTask) { + createBuilderTask(scope.rootTasks); + } if (!iterator.hasNext() && !constraintIterator.hasNext()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); work.updateDbEventState(null); @@ -218,8 +229,11 @@ a database ( directory ) // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs()); + } catch (RuntimeException e) { + LOG.error("replication failed with run time exception", e); + throw e; } catch (Exception e) { - LOG.error("failed replication", e); + LOG.error("replication failed", e); setException(e); return 1; } @@ -292,19 +306,30 @@ private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) } } - private void createBuilderTask(List> rootTasks, - boolean shouldCreateAnotherLoadTask) { - /* - use loadTask as dependencyCollection - */ - if (shouldCreateAnotherLoadTask) { - Task loadTask = TaskFactory.get(work, conf); - DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask)); - } + private void createBuilderTask(List> rootTasks) { + // Use loadTask as dependencyCollection + Task loadTask = TaskFactory.get(work, conf); + DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask)); } @Override public StageType getType() { - return StageType.REPL_BOOTSTRAP_LOAD; + return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD; + } + + private int executeIncrementalLoad(DriverContext driverContext) { + try { + IncrementalLoad load = work.getIncrementalLoad(); + childTasks = Collections.singletonList(load.execute(driverContext, getHive(), LOG)); + if (work.getIncrementalIterator().hasNext()) { + // attach a load task at the tail of task list to start the next iteration. + createBuilderTask(childTasks); + } + return 0; + } catch (Exception e) { + LOG.error("failed replication", e); + setException(e); + return 1; + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java similarity index 66% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 048727fd94..b833a21e52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -15,12 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.exec.repl.bootstrap; +package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.IncrementalLoadEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.IncrementalLoad; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.session.LineageState; @@ -34,10 +36,12 @@ final String dbNameToLoadIn; final String tableNameToLoadIn; final String dumpDirectory; - private final BootstrapEventsIterator iterator; + private final transient BootstrapEventsIterator bootstrapIterator; private final ConstraintEventsIterator constraintsIterator; + private final transient IncrementalLoadEventsIterator incrementalIterator; private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; + private final transient IncrementalLoad incrementalLoad; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -47,23 +51,32 @@ final LineageState sessionStateLineageState; public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn, LineageState lineageState) - throws IOException { + String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; - this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); - this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; + if (isIncrementalDump) { + incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, hiveConf); + this.bootstrapIterator = null; + this.constraintsIterator = null; + incrementalLoad = new IncrementalLoad(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory, + incrementalIterator, hiveConf); + } else { + this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); + this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); + incrementalIterator = null; + incrementalLoad = null; + } } public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, LineageState lineageState) throws IOException { - this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState); + this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, false); } public BootstrapEventsIterator iterator() { - return iterator; + return bootstrapIterator; } public ConstraintEventsIterator constraintIterator() { @@ -85,4 +98,16 @@ DatabaseEvent databaseEvent(HiveConf hiveConf) { boolean hasDbState() { return state != null; } + + public boolean isIncrementalLoad() { + return incrementalIterator != null; + } + + public IncrementalLoadEventsIterator getIncrementalIterator() { + return incrementalIterator; + } + + public IncrementalLoad getIncrementalLoad() { + return incrementalLoad; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java index 0313058b0d..dcc54f997d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java @@ -28,7 +28,7 @@ public class AddDependencyToLeaves implements DAGTraversal.Function { private List> postDependencyCollectionTasks; - AddDependencyToLeaves(List> postDependencyCollectionTasks) { + public AddDependencyToLeaves(List> postDependencyCollectionTasks) { this.postDependencyCollectionTasks = postDependencyCollectionTasks; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/IncrementalLoadEventsIterator.java new file mode 100644 index 0000000000..1a91be8cc9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/IncrementalLoadEventsIterator.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * IncrementalLoadEventsIterator + * Helper class to iterate through even dump directory. + */ +public class IncrementalLoadEventsIterator implements Iterator { + private FileStatus[] eventDirs; + private int currentIndex; + private int numEvents; + + public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOException { + Path eventPath = new Path(loadPath); + FileSystem fs = eventPath.getFileSystem(conf); + eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs)); + // For event dump, each sub-dir is an individual event dump. + // We need to guarantee that the directory listing we got is in order of event id. + Arrays.sort(eventDirs, new EventDumpDirComparator()); + currentIndex = 0; + numEvents = eventDirs.length; + } + + @Override + public boolean hasNext() { + return (eventDirs != null && currentIndex < eventDirs.length); + } + + @Override + public FileStatus next() { + if (hasNext()) { + return eventDirs[currentIndex++]; + } else { + throw new NoSuchElementException("no more events"); + } + } + + public int getNumEvents() { + return numEvents; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/IncrementalLoad.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/IncrementalLoad.java new file mode 100644 index 0000000000..f973b03e19 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/IncrementalLoad.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.IncrementalLoadEventsIterator; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; +import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.slf4j.Logger; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.HashSet; + +/** + * IncrementalLoad + * Iterate through the dump directory and create tasks to load the events. + */ +public class IncrementalLoad { + private final String dbName, tableName; + private final IncrementalLoadEventsIterator iterator; + private HashSet inputs; + private HashSet outputs; + private Logger log; + private final HiveConf conf; + private final String loadPath; + + public IncrementalLoad(String dbName, String tableName, String loadPath, + IncrementalLoadEventsIterator iterator, HiveConf conf) { + this.dbName = dbName; + this.tableName = tableName; + this.iterator = iterator; + inputs = new HashSet<>(); + outputs = new HashSet<>(); + log = null; + this.conf = conf; + this.loadPath = loadPath; + } + + public Task execute(DriverContext driverContext, Hive hive, Logger log) throws Exception { + int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); + Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork()); + Task taskChainTail = evTaskRoot; + Long lastReplayedEvent = null; + this.log = log; + + ReplLogger replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); + + while (iterator.hasNext() && maxTasks > 0) { + FileStatus dir = iterator.next(); + String location = dir.getPath().toUri().toString(); + DumpMetaData eventDmd = new DumpMetaData(new Path(location), conf); + + if (!shouldReplayEvent(dir, eventDmd.getDumpType(), dbName, tableName)) { + this.log.debug("Skipping event {} from {} for table {}.{} maxTasks: {}", + eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, maxTasks); + continue; + } + + this.log.debug("Loading event {} from {} for table {}.{} maxTasks: {}", + eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, maxTasks); + + // event loads will behave similar to table loads, with one crucial difference + // precursor order is strict, and each event must be processed after the previous one. + // The way we handle this strict order is as follows: + // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask) + // at the head of our event chain. For each event we process, we tell analyzeTableLoad to + // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks + // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all + // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of + // tasks as follows: + // + // --->ev1.task1-- --->ev2.task1-- + // / \ / \ + // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail + // \ / + // --->ev1.task3-- + // + // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the + // entire chain + + MessageHandler.Context context = new MessageHandler.Context(dbName, tableName, location, + taskChainTail, eventDmd, conf, hive, driverContext.getCtx(), this.log); + List> evTasks = analyzeEventLoad(context); + + if ((evTasks != null) && (!evTasks.isEmpty())) { + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, + dir.getPath().getName(), + eventDmd.getDumpType().toString()); + Task barrierTask = TaskFactory.get(replStateLogWork); + for (Task t : evTasks) { + t.addDependentTask(barrierTask); + this.log.debug("Added {}:{} as a precursor of barrier task {}:{}", + t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); + } + this.log.debug("Updated taskChainTail from {}:{} to {}:{}", + taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); + taskChainTail = barrierTask; + maxTasks -= evTasks.size(); + } + lastReplayedEvent = eventDmd.getEventTo(); + } + + // If any event is there and db name is known, then dump the start and end logs + if (!evTaskRoot.equals(taskChainTail) && !iterator.hasNext()) { + Map dbProps = new HashMap<>(); + dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); + Task barrierTask = TaskFactory.get(replStateLogWork, conf); + taskChainTail.addDependentTask(barrierTask); + this.log.debug("Added {}:{} as a precursor of barrier task {}:{}", + taskChainTail.getClass(), taskChainTail.getId(), + barrierTask.getClass(), barrierTask.getId()); + + replLogger.startLog(); + } + return evTaskRoot; + } + + private boolean isEventNotReplayed(Map params, FileStatus dir, DumpType dumpType) { + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { + String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) { + log.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName()) + + " is already replayed. LastReplId - " + Long.parseLong(replLastId)); + return false; + } + } + return true; + } + + private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbName, String tableName) { + // if database itself is null then we can not filter out anything. + if (dbName == null || dbName.isEmpty()) { + return true; + } else if ((tableName == null) || (tableName.isEmpty())) { + Database database; + try { + database = Hive.get().getDatabase(dbName); + return isEventNotReplayed(database.getParameters(), dir, dumpType); + } catch (HiveException e) { + //may be the db is getting created in this load + log.debug("failed to get the database " + dbName); + return true; + } + } else { + Table tbl; + try { + tbl = Hive.get().getTable(dbName, tableName); + return isEventNotReplayed(tbl.getParameters(), dir, dumpType); + } catch (HiveException e) { + // may be the table is getting created in this load + log.debug("failed to get the table " + dbName + "." + tableName); + return true; + } + } + } + + private List> analyzeEventLoad(MessageHandler.Context context) throws SemanticException { + MessageHandler messageHandler = context.dmd.getDumpType().handler(); + List> tasks = messageHandler.handle(context); + + if (context.precursor != null) { + for (Task t : tasks) { + context.precursor.addDependentTask(t); + log.debug("Added {}:{} as a precursor of {}:{}", + context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId()); + } + } + + inputs.addAll(messageHandler.readEntities()); + outputs.addAll(messageHandler.writeEntities()); + return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), messageHandler.getUpdatedMetadata(), tasks); + } + + private Task tableUpdateReplStateTask(String dbName, String tableName, + Map partSpec, String replState, + Task preCursor) throws SemanticException { + HashMap mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterTableDesc alterTblDesc = new AlterTableDesc( + AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState)); + alterTblDesc.setProps(mapProp); + alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName)); + alterTblDesc.setPartSpec((HashMap)partSpec); + + Task updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterTblDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private Task dbUpdateReplStateTask(String dbName, String replState, + Task preCursor) { + HashMap mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(replState, replState)); + Task updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterDbDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private List> addUpdateReplStateTasks(boolean isDatabaseLoad, + UpdatedMetaDataTracker updatedMetadata, + List> importTasks) throws SemanticException { + String replState = updatedMetadata.getReplicationState(); + String database = updatedMetadata.getDatabase(); + String table = updatedMetadata.getTable(); + + // If no import tasks generated by the event or no table updated for table level load, then no + // need to update the repl state to any object. + if (importTasks.isEmpty() || (!isDatabaseLoad && (table == null))) { + log.debug("No objects need update of repl state: Either 0 import tasks or table level load"); + return importTasks; + } + + // Create a barrier task for dependency collection of import tasks + Task barrierTask = TaskFactory.get(new DependencyCollectionWork()); + + // Link import tasks to the barrier task which will in-turn linked with repl state update tasks + for (Task t : importTasks){ + t.addDependentTask(barrierTask); + log.debug("Added {}:{} as a precursor of barrier task {}:{}", + t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); + } + + List> tasks = new ArrayList<>(); + Task updateReplIdTask; + + // If any partition is updated, then update repl state in partition object + for (final Map partSpec : updatedMetadata.getPartitions()) { + updateReplIdTask = tableUpdateReplStateTask(database, table, partSpec, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + if (table != null) { + // If any table/partition is updated, then update repl state in table object + updateReplIdTask = tableUpdateReplStateTask(database, table, null, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // For table level load, need not update replication state for the database + if (isDatabaseLoad) { + // If any table/partition is updated, then update repl state in db object + updateReplIdTask = dbUpdateReplStateTask(database, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // At least one task would have been added to update the repl state + return tasks; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java index a91f45e204..cf54aa3709 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.parse.GenTezProcContext; import org.apache.hadoop.hive.ql.parse.GenTezWork; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index d7b3104cf1..659915283a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -18,46 +18,28 @@ package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; -import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.repl.DumpType; -import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; -import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; -import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; -import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; -import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.AlterTableDesc; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.stats.StatsUtils; import java.io.FileNotFoundException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -262,45 +244,6 @@ private void initReplLoad(ASTNode ast) throws SemanticException { } } - private boolean isEventNotReplayed(Map params, FileStatus dir, DumpType dumpType) { - if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { - String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); - if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) { - LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName()) - + " is already replayed. LastReplId - " + Long.parseLong(replLastId)); - return false; - } - } - return true; - } - - private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException { - // if database itself is null then we can not filter out anything. - if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) { - return true; - } else if ((tblNameOrPattern == null) || (tblNameOrPattern.isEmpty())) { - Database database; - try { - database = Hive.get().getDatabase(dbNameOrPattern); - return isEventNotReplayed(database.getParameters(), dir, dumpType); - } catch (HiveException e) { - //may be the db is getting created in this load - LOG.debug("failed to get the database " + dbNameOrPattern); - return true; - } - } else { - Table tbl; - try { - tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern); - return isEventNotReplayed(tbl.getParameters(), dir, dumpType); - } catch (HiveException e) { - // may be the table is getting created in this load - LOG.debug("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern); - return true; - } - } - } - /* * Example dump dirs we need to be able to handle : * @@ -388,7 +331,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState()); + tblNameOrPattern, queryState.getLineageState(), false); rootTasks.add(TaskFactory.get(replLoadWork, conf)); return; } @@ -426,209 +369,17 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // analyzeDatabaseLoad(dbNameOrPattern, fs, dir); // } } else { - // Event dump, each sub-dir is an individual event dump. - // We need to guarantee that the directory listing we got is in order of evid. - Arrays.sort(dirsInLoadPath, new EventDumpDirComparator()); - - Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork()); - Task taskChainTail = evTaskRoot; - - ReplLogger replLogger = new IncrementalLoadLogger(dbNameOrPattern, - loadPath.toString(), dirsInLoadPath.length); - - for (FileStatus dir : dirsInLoadPath){ - String locn = dir.getPath().toUri().toString(); - DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); - - if (!shouldReplayEvent(dir, eventDmd.getDumpType())) { - continue; - } - - LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); - - // event loads will behave similar to table loads, with one crucial difference - // precursor order is strict, and each event must be processed after the previous one. - // The way we handle this strict order is as follows: - // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask) - // at the head of our event chain. For each event we process, we tell analyzeTableLoad to - // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks - // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all - // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of - // tasks as follows: - // - // --->ev1.task1-- --->ev2.task1-- - // / \ / \ - // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail - // \ / - // --->ev1.task3-- - // - // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the - // entire chain - - MessageHandler.Context context = new MessageHandler.Context(dbNameOrPattern, - tblNameOrPattern, locn, taskChainTail, - eventDmd, conf, db, ctx, LOG); - List> evTasks = analyzeEventLoad(context); - - if ((evTasks != null) && (!evTasks.isEmpty())){ - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, - dir.getPath().getName(), - eventDmd.getDumpType().toString()); - Task barrierTask = TaskFactory.get(replStateLogWork); - for (Task t : evTasks){ - t.addDependentTask(barrierTask); - LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", - t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); - } - LOG.debug("Updated taskChainTail from {}:{} to {}:{}", - taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); - taskChainTail = barrierTask; - } - } - - // If any event is there and db name is known, then dump the start and end logs - if (!evTaskRoot.equals(taskChainTail)) { - Map dbProps = new HashMap<>(); - dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(dmd.getEventTo())); - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); - Task barrierTask = TaskFactory.get(replStateLogWork, conf); - taskChainTail.addDependentTask(barrierTask); - LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", - taskChainTail.getClass(), taskChainTail.getId(), - barrierTask.getClass(), barrierTask.getId()); - - replLogger.startLog(); - } - rootTasks.add(evTaskRoot); + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, + tblNameOrPattern, queryState.getLineageState(), true); + rootTasks.add(TaskFactory.get(replLoadWork, conf)); + return; } - } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); } } - private List> analyzeEventLoad( - MessageHandler.Context context) - throws SemanticException { - MessageHandler messageHandler = context.dmd.getDumpType().handler(); - List> tasks = messageHandler.handle(context); - - if (context.precursor != null) { - for (Task t : tasks) { - context.precursor.addDependentTask(t); - LOG.debug("Added {}:{} as a precursor of {}:{}", - context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId()); - } - } - - inputs.addAll(messageHandler.readEntities()); - outputs.addAll(messageHandler.writeEntities()); - return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), - messageHandler.getUpdatedMetadata(), tasks); - } - - private Task tableUpdateReplStateTask( - String dbName, - String tableName, - Map partSpec, - String replState, - Task preCursor) throws SemanticException { - HashMap mapProp = new HashMap<>(); - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); - - AlterTableDesc alterTblDesc = new AlterTableDesc( - AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState)); - alterTblDesc.setProps(mapProp); - alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName)); - alterTblDesc.setPartSpec((HashMap)partSpec); - - Task updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterTblDesc), conf); - - // Link the update repl state task with dependency collection task - if (preCursor != null) { - preCursor.addDependentTask(updateReplIdTask); - LOG.debug("Added {}:{} as a precursor of {}:{}", - preCursor.getClass(), preCursor.getId(), - updateReplIdTask.getClass(), updateReplIdTask.getId()); - } - return updateReplIdTask; - } - - private Task dbUpdateReplStateTask( - String dbName, - String replState, - Task preCursor) { - HashMap mapProp = new HashMap<>(); - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); - - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc( - dbName, mapProp, new ReplicationSpec(replState, replState)); - Task updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterDbDesc), conf); - - // Link the update repl state task with dependency collection task - if (preCursor != null) { - preCursor.addDependentTask(updateReplIdTask); - LOG.debug("Added {}:{} as a precursor of {}:{}", - preCursor.getClass(), preCursor.getId(), - updateReplIdTask.getClass(), updateReplIdTask.getId()); - } - return updateReplIdTask; - } - - private List> addUpdateReplStateTasks( - boolean isDatabaseLoad, - UpdatedMetaDataTracker updatedMetadata, - List> importTasks) throws SemanticException { - String replState = updatedMetadata.getReplicationState(); - String dbName = updatedMetadata.getDatabase(); - String tableName = updatedMetadata.getTable(); - - // If no import tasks generated by the event or no table updated for table level load, then no - // need to update the repl state to any object. - if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) { - LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load"); - return importTasks; - } - - // Create a barrier task for dependency collection of import tasks - Task barrierTask = TaskFactory.get(new DependencyCollectionWork()); - - // Link import tasks to the barrier task which will in-turn linked with repl state update tasks - for (Task t : importTasks){ - t.addDependentTask(barrierTask); - LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", - t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); - } - - List> tasks = new ArrayList<>(); - Task updateReplIdTask; - - // If any partition is updated, then update repl state in partition object - for (final Map partSpec : updatedMetadata.getPartitions()) { - updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); - tasks.add(updateReplIdTask); - } - - if (tableName != null) { - // If any table/partition is updated, then update repl state in table object - updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask); - tasks.add(updateReplIdTask); - } - - // For table level load, need not update replication state for the database - if (isDatabaseLoad) { - // If any table/partition is updated, then update repl state in db object - updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); - tasks.add(updateReplIdTask); - } - - // At least one task would have been added to update the repl state - return tasks; - } - // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());