diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index a1498cac5c..4892486b52 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -345,7 +345,7 @@ public void testTxnEventNonAcid() throws Throwable { WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); - replicaNonAcid.loadFailure(replicatedDbName, incrementalDump.dumpLocation) + replicaNonAcid.runFailure("REPL LOAD " + replicatedDbName + " FROM '" + incrementalDump.dumpLocation + "'") .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); } @@ -395,10 +395,8 @@ public Boolean apply(@Nullable CallerArguments args) { replica.run("use " + replicatedDbName) .run("repl status " + replicatedDbName) .verifyResult("null") - .run("show tables") - .verifyResults(new String[] { "t1" }) - .run("select id from t1") - .verifyResults(Arrays.asList("1")); + .run("show tables like t2") + .verifyResults(new String[] { }); // Retry with different dump should fail. replica.loadFailure(replicatedDbName, tuple2.dumpLocation); @@ -413,10 +411,6 @@ public Boolean apply(@Nullable CallerArguments args) { LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)); return false; } - if (args.tblName != null) { - LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); - return args.tblName.equals("t2"); - } return true; } }; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index c5f35e2c7d..e992fb4e11 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.apache.hadoop.hive.shims.Utils; @@ -45,6 +45,8 @@ import org.junit.rules.TestRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.junit.Assert; import java.io.IOException; import java.net.URI; @@ -77,10 +79,11 @@ protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private static WarehouseInstance primary, replica; private String primaryDbName, replicatedDbName; + private static HiveConf conf; @BeforeClass public static void classLevelSetup() throws Exception { - Configuration conf = new Configuration(); + conf = new HiveConf(TestReplicationScenarios.class); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniDFSCluster = @@ -875,6 +878,91 @@ private void verifyIfSrcOfReplPropMissing(Map props) { assertFalse(props.containsKey(SOURCE_OF_REPLICATION)); } + @Test + public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (10)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "uk", "us")); + + Database db = replica.getDatabase(replicatedDbName); + verifyIfCkptSet(db.getParameters(), tuple.dumpLocation); + Table t1 = replica.getTable(replicatedDbName, "t1"); + verifyIfCkptSet(t1.getParameters(), tuple.dumpLocation); + Table t2 = replica.getTable(replicatedDbName, "t2"); + verifyIfCkptSet(t2.getParameters(), tuple.dumpLocation); + Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india")); + verifyIfCkptSet(india.getParameters(), tuple.dumpLocation); + Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us")); + verifyIfCkptSet(us.getParameters(), tuple.dumpLocation); + Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk")); + verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation); + } + + @Test + public void testIncrementalDumpMultiIteration() throws Throwable { + WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null); + + replica.load(replicatedDbName, bootstrapTuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(bootstrapTuple.lastReplicationId); + + WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName) + .run("create table table1 (id int) partitioned by (country string)") + .run("create table table2 (id int)") + .run("create table table3 (id int) partitioned by (country string)") + .run("insert into table1 partition(country='india') values(1)") + .run("insert into table2 values(2)") + .run("insert into table3 partition(country='india') values(3)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='10'")) + .status(replicatedDbName) + .verifyResult(incremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResults(new String[] {"1" }) + .run("select * from table2") + .verifyResults(new String[] {"2" }) + .run("select id from table3") + .verifyResults(new String[] {"3" }); + assert(IncrementalLoadTasksBuilder.getNumIteration() > 1); + + incremental = primary.run("use " + primaryDbName) + .run("create table table5 (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc") + .run("create table table4 (i int, j int)") + .run("insert into table4 values (1,2)") + .dump(primaryDbName, incremental.lastReplicationId); + + Path path = new Path(incremental.dumpLocation); + FileSystem fs = path.getFileSystem(conf); + FileStatus[] fileStatus = fs.listStatus(path); + int numEvents = fileStatus.length - 1; //one is metadata file + + replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='1'")) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"table1", "table2", "table3", "table4", "table5" }) + .run("select i from table4") + .verifyResult("1"); + Assert.assertEquals(IncrementalLoadTasksBuilder.getNumIteration(), numEvents); + } + @Test public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable { WarehouseInstance.Tuple tuplePrimary = primary @@ -1087,9 +1175,7 @@ public Boolean apply(@Nullable CallerArguments args) { replica.run("use " + replicatedDbName) .run("repl status " + replicatedDbName) - .verifyResult("null") - .run("show tables") - .verifyResults(new String[] { "t1" }); + .verifyResult("null"); assertEquals(0, replica.getPrimaryKeyList(replicatedDbName, "t1").size()); assertEquals(0, replica.getUniqueConstraintList(replicatedDbName, "t3").size()); assertEquals(0, replica.getNotNullConstraintList(replicatedDbName, "t3").size()); @@ -1109,10 +1195,6 @@ public Boolean apply(@Nullable CallerArguments args) { LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName)); return false; } - if (args.tblName != null) { - LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); - return (args.tblName.equals("t2") || args.tblName.equals("t3")); - } if (args.constraintTblName != null) { LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName)); return (args.constraintTblName.equals("t1") || args.constraintTblName.equals("t3")); @@ -1179,8 +1261,6 @@ public Boolean apply(@Nullable CallerArguments args) { public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws Throwable { WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) - .run("create table t1 (id int)") - .run("insert into table t1 values (10)") .run("create table t2 (place string) partitioned by (country string)") .run("insert into table t2 partition(country='india') values ('bangalore')") .run("insert into table t2 partition(country='uk') values ('london')") @@ -1195,7 +1275,7 @@ public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws Throwab .dump(primaryDbName, null); // Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk". - // So, table "t1" and "t2" will exist and partition "india" will exist, rest failed as operation failed. + // So, table "t2" will exist and partition "india" will exist, rest failed as operation failed. BehaviourInjection getPartitionStub = new BehaviourInjection() { @Nullable @@ -1220,9 +1300,7 @@ public Partition apply(@Nullable Partition ptn) { .run("repl status " + replicatedDbName) .verifyResult("null") .run("show tables") - .verifyResults(new String[] { "t1", "t2" }) - .run("select id from t1") - .verifyResults(Arrays.asList("10")) + .verifyResults(new String[] {"t2" }) .run("select country from t2 order by country") .verifyResults(Arrays.asList("india")) .run("show functions like '" + replicatedDbName + "*'") @@ -1259,9 +1337,7 @@ public Boolean apply(@Nullable CallerArguments args) { .run("repl status " + replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("show tables") - .verifyResults(new String[] { "t1", "t2" }) - .run("select id from t1") - .verifyResults(Arrays.asList("10")) + .verifyResults(new String[] { "t2" }) .run("select country from t2 order by country") .verifyResults(Arrays.asList("india", "uk", "us")) .run("show functions like '" + replicatedDbName + "*'") diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index ad778e3245..d43eed3153 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -103,7 +103,8 @@ enum StageType { REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, - REPL_TXN + REPL_TXN, + REPL_INCREMENTAL_LOAD } struct Stage { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index b6eb12ab13..73bbe3a377 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -119,7 +119,8 @@ int _kStageTypeValues[] = { StageType::REPL_DUMP, StageType::REPL_BOOTSTRAP_LOAD, StageType::REPL_STATE_LOG, - StageType::REPL_TXN + StageType::REPL_TXN, + StageType::REPL_INCREMENTAL_LOAD }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -137,9 +138,10 @@ const char* _kStageTypeNames[] = { "REPL_DUMP", "REPL_BOOTSTRAP_LOAD", "REPL_STATE_LOG", - "REPL_TXN" + "REPL_TXN", + "REPL_INCREMENTAL_LOAD" }; -const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(16, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(17, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index eb02107e64..04c749f1eb 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -97,7 +97,8 @@ struct StageType { REPL_DUMP = 12, REPL_BOOTSTRAP_LOAD = 13, REPL_STATE_LOG = 14, - REPL_TXN = 15 + REPL_TXN = 15, + REPL_INCREMENTAL_LOAD = 16 }; }; diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index 08822b3cc7..7eebe28732 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -27,7 +27,8 @@ REPL_DUMP(12), REPL_BOOTSTRAP_LOAD(13), REPL_STATE_LOG(14), - REPL_TXN(15); + REPL_TXN(15), + REPL_INCREMENTAL_LOAD(16); private final int value; @@ -80,6 +81,8 @@ public static StageType findByValue(int value) { return REPL_STATE_LOG; case 15: return REPL_TXN; + case 16: + return REPL_INCREMENTAL_LOAD; default: return null; } diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index df4e41db93..1a36d08f92 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -118,6 +118,7 @@ final class StageType { const REPL_BOOTSTRAP_LOAD = 13; const REPL_STATE_LOG = 14; const REPL_TXN = 15; + const REPL_INCREMENTAL_LOAD = 16; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -135,6 +136,7 @@ final class StageType { 13 => 'REPL_BOOTSTRAP_LOAD', 14 => 'REPL_STATE_LOG', 15 => 'REPL_TXN', + 16 => 'REPL_INCREMENTAL_LOAD', ); } diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 85d39fdbe1..c0a22044a7 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -164,6 +164,7 @@ class StageType: REPL_BOOTSTRAP_LOAD = 13 REPL_STATE_LOG = 14 REPL_TXN = 15 + REPL_INCREMENTAL_LOAD = 16 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -182,6 +183,7 @@ class StageType: 13: "REPL_BOOTSTRAP_LOAD", 14: "REPL_STATE_LOG", 15: "REPL_TXN", + 16: "REPL_INCREMENTAL_LOAD", } _NAMES_TO_VALUES = { @@ -201,6 +203,7 @@ class StageType: "REPL_BOOTSTRAP_LOAD": 13, "REPL_STATE_LOG": 14, "REPL_TXN": 15, + "REPL_INCREMENTAL_LOAD": 16, } diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index 6010f3d21e..61349a2191 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -76,8 +76,9 @@ module StageType REPL_BOOTSTRAP_LOAD = 13 REPL_STATE_LOG = 14 REPL_TXN = 15 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN]).freeze + REPL_INCREMENTAL_LOAD = 16 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD]).freeze end class Adjacency diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 3a107b7e81..47a802f4f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -29,8 +29,8 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java similarity index 90% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index b33a774307..09e7ba1472 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.exec.repl.bootstrap; +package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; @@ -34,7 +34,8 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; @@ -57,7 +58,7 @@ @Override public String getName() { - return "REPL_BOOTSTRAP_LOAD"; + return (work.isIncrementalLoad() ? "REPL_INCREMENTAL_LOAD" : "REPL_BOOTSTRAP_LOAD"); } /** @@ -71,6 +72,14 @@ public String getName() { @Override protected int execute(DriverContext driverContext) { + if (work.isIncrementalLoad()) { + return executeIncrementalLoad(driverContext); + } else { + return executeBootStrapLoad(driverContext); + } + } + + private int executeBootStrapLoad(DriverContext driverContext) { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); Context context = new Context(work.dumpDirectory, conf, getHive(), @@ -206,7 +215,9 @@ a database ( directory ) } boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() || constraintIterator.hasNext(); - createBuilderTask(scope.rootTasks, addAnotherLoadTask); + if (addAnotherLoadTask) { + createBuilderTask(scope.rootTasks); + } if (!iterator.hasNext() && !constraintIterator.hasNext()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); work.updateDbEventState(null); @@ -221,8 +232,11 @@ a database ( directory ) // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs()); + } catch (RuntimeException e) { + LOG.error("replication failed with run time exception", e); + throw e; } catch (Exception e) { - LOG.error("failed replication", e); + LOG.error("replication failed", e); setException(e); return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } @@ -301,19 +315,26 @@ private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) } } - private void createBuilderTask(List> rootTasks, - boolean shouldCreateAnotherLoadTask) { - /* - use loadTask as dependencyCollection - */ - if (shouldCreateAnotherLoadTask) { - Task loadTask = TaskFactory.get(work, conf); - DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask)); - } + private void createBuilderTask(List> rootTasks) { + // Use loadTask as dependencyCollection + Task loadTask = TaskFactory.get(work, conf); + DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask)); } @Override public StageType getType() { - return StageType.REPL_BOOTSTRAP_LOAD; + return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD; + } + + private int executeIncrementalLoad(DriverContext driverContext) { + try { + IncrementalLoadTasksBuilder load = work.getIncrementalLoadTaskBuilder(); + this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG, work)); + return 0; + } catch (Exception e) { + LOG.error("failed replication", e); + setException(e); + return 1; + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java similarity index 66% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 048727fd94..8921e948fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -15,12 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.exec.repl.bootstrap; +package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.session.LineageState; @@ -34,10 +36,12 @@ final String dbNameToLoadIn; final String tableNameToLoadIn; final String dumpDirectory; - private final BootstrapEventsIterator iterator; + private final transient BootstrapEventsIterator bootstrapIterator; private final ConstraintEventsIterator constraintsIterator; + private final transient IncrementalLoadEventsIterator incrementalIterator; private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; + private final transient IncrementalLoadTasksBuilder incrementalLoad; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -47,23 +51,32 @@ final LineageState sessionStateLineageState; public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn, LineageState lineageState) - throws IOException { + String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; - this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); - this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; + if (isIncrementalDump) { + incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, hiveConf); + this.bootstrapIterator = null; + this.constraintsIterator = null; + incrementalLoad = new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory, + incrementalIterator, hiveConf); + } else { + this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); + this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); + incrementalIterator = null; + incrementalLoad = null; + } } public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, LineageState lineageState) throws IOException { - this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState); + this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, false); } public BootstrapEventsIterator iterator() { - return iterator; + return bootstrapIterator; } public ConstraintEventsIterator constraintIterator() { @@ -85,4 +98,16 @@ DatabaseEvent databaseEvent(HiveConf hiveConf) { boolean hasDbState() { return state != null; } + + public boolean isIncrementalLoad() { + return incrementalIterator != null; + } + + public IncrementalLoadEventsIterator getIncrementalIterator() { + return incrementalIterator; + } + + public IncrementalLoadTasksBuilder getIncrementalLoadTaskBuilder() { + return incrementalLoad; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index 89d2ac23d0..ebe0090ab2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -82,6 +82,15 @@ public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, Hive FileSystem fileSystem = path.getFileSystem(hiveConf); FileStatus[] fileStatuses = fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem)); + if ((fileStatuses == null) || (fileStatuses.length == 0)) { + throw new IllegalArgumentException("No data to load in path " + dumpDirectory); + } + if ((dbNameToLoadIn != null) && (fileStatuses.length > 1)) { + throw new IllegalArgumentException( + "Multiple dirs in " + + dumpDirectory + + " does not correspond to REPL LOAD expecting to load to a singular destination point."); + } List dbsToCreate = Arrays.stream(fileStatuses).filter(f -> { Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java index 26f4892e33..d09b98c6e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index 0270d2afec..054153ca8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -31,8 +32,8 @@ import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PrincipalDesc; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType; import java.io.Serializable; import java.util.HashMap; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index b886ff4344..a7c8ca4558 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -26,9 +26,10 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.EximUtil; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index f6493f71d4..c0cfc439d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 419a511787..089b529b7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -27,10 +27,10 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java index b5b5b90dc6..8e01fb1e6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java new file mode 100644 index 0000000000..4b37c8dd98 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.incremental; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * IncrementalLoadEventsIterator + * Helper class to iterate through event dump directory. + */ +public class IncrementalLoadEventsIterator implements Iterator { + private FileStatus[] eventDirs; + private int currentIndex; + private int numEvents; + + public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOException { + Path eventPath = new Path(loadPath); + FileSystem fs = eventPath.getFileSystem(conf); + eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs)); + if ((eventDirs == null) || (eventDirs.length == 0)) { + throw new IllegalArgumentException("No data to load in path " + loadPath); + } + // For event dump, each sub-dir is an individual event dump. + // We need to guarantee that the directory listing we got is in order of event id. + Arrays.sort(eventDirs, new EventDumpDirComparator()); + currentIndex = 0; + numEvents = eventDirs.length; + } + + @Override + public boolean hasNext() { + return (eventDirs != null && currentIndex < numEvents); + } + + @Override + public FileStatus next() { + if (hasNext()) { + return eventDirs[currentIndex++]; + } else { + throw new NoSuchElementException("no more events"); + } + } + + public int getNumEvents() { + return numEvents; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java new file mode 100644 index 0000000000..d21168de23 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.incremental; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; +import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.slf4j.Logger; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.HashSet; + +/** + * IncrementalLoad + * Iterate through the dump directory and create tasks to load the events. + */ +public class IncrementalLoadTasksBuilder { + private final String dbName, tableName; + private final IncrementalLoadEventsIterator iterator; + private HashSet inputs; + private HashSet outputs; + private Logger log; + private final HiveConf conf; + private final ReplLogger replLogger; + private static long numIteration; + + public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath, + IncrementalLoadEventsIterator iterator, HiveConf conf) { + this.dbName = dbName; + this.tableName = tableName; + this.iterator = iterator; + inputs = new HashSet<>(); + outputs = new HashSet<>(); + log = null; + this.conf = conf; + replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); + numIteration = 0; + replLogger.startLog(); + } + + public Task build(DriverContext driverContext, Hive hive, Logger log, + ReplLoadWork loadWork) throws Exception { + Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork()); + Task taskChainTail = evTaskRoot; + Long lastReplayedEvent = null; + this.log = log; + numIteration++; + this.log.debug("Iteration num " + numIteration); + TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); + + while (iterator.hasNext() && tracker.canAddMoreTasks()) { + FileStatus dir = iterator.next(); + String location = dir.getPath().toUri().toString(); + DumpMetaData eventDmd = new DumpMetaData(new Path(location), conf); + + if (!shouldReplayEvent(dir, eventDmd.getDumpType(), dbName, tableName)) { + this.log.debug("Skipping event {} from {} for table {}.{} maxTasks: {}", + eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks()); + continue; + } + + this.log.debug("Loading event {} from {} for table {}.{} maxTasks: {}", + eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks()); + + // event loads will behave similar to table loads, with one crucial difference + // precursor order is strict, and each event must be processed after the previous one. + // The way we handle this strict order is as follows: + // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask) + // at the head of our event chain. For each event we process, we tell analyzeTableLoad to + // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks + // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all + // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of + // tasks as follows: + // + // --->ev1.task1-- --->ev2.task1-- + // / \ / \ + // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail + // \ / + // --->ev1.task3-- + // + // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the + // entire chain + + MessageHandler.Context context = new MessageHandler.Context(dbName, tableName, location, + taskChainTail, eventDmd, conf, hive, driverContext.getCtx(), this.log); + List> evTasks = analyzeEventLoad(context); + + if ((evTasks != null) && (!evTasks.isEmpty())) { + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, + dir.getPath().getName(), + eventDmd.getDumpType().toString()); + Task barrierTask = TaskFactory.get(replStateLogWork); + AddDependencyToLeaves function = new AddDependencyToLeaves(barrierTask); + DAGTraversal.traverse(evTasks, function); + this.log.debug("Updated taskChainTail from {}:{} to {}:{}", + taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); + tracker.addTaskList(taskChainTail.getChildTasks()); + taskChainTail = barrierTask; + } + lastReplayedEvent = eventDmd.getEventTo(); + } + + if (iterator.hasNext()) { + // add load task to start the next iteration + taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf)); + } else { + Map dbProps = new HashMap<>(); + dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); + Task barrierTask = TaskFactory.get(replStateLogWork, conf); + taskChainTail.addDependentTask(barrierTask); + this.log.debug("Added {}:{} as a precursor of barrier task {}:{}", + taskChainTail.getClass(), taskChainTail.getId(), + barrierTask.getClass(), barrierTask.getId()); + } + log.info("Iteration " + numIteration + " done with num task : " + + tracker.numberOfTasks() + ", lastReplayedEvent : " + lastReplayedEvent); + return evTaskRoot; + } + + private boolean isEventNotReplayed(Map params, FileStatus dir, DumpType dumpType) { + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { + String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) { + log.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName()) + + " is already replayed. LastReplId - " + Long.parseLong(replLastId)); + return false; + } + } + return true; + } + + private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbName, String tableName) { + // if database itself is null then we can not filter out anything. + if (dbName == null || dbName.isEmpty()) { + return true; + } else if ((tableName == null) || (tableName.isEmpty())) { + Database database; + try { + database = Hive.get().getDatabase(dbName); + return isEventNotReplayed(database.getParameters(), dir, dumpType); + } catch (HiveException e) { + //may be the db is getting created in this load + log.debug("failed to get the database " + dbName); + return true; + } + } else { + Table tbl; + try { + tbl = Hive.get().getTable(dbName, tableName); + return isEventNotReplayed(tbl.getParameters(), dir, dumpType); + } catch (HiveException e) { + // may be the table is getting created in this load + log.debug("failed to get the table " + dbName + "." + tableName); + return true; + } + } + } + + private List> analyzeEventLoad(MessageHandler.Context context) throws SemanticException { + MessageHandler messageHandler = context.dmd.getDumpType().handler(); + List> tasks = messageHandler.handle(context); + + if (context.precursor != null) { + for (Task t : tasks) { + context.precursor.addDependentTask(t); + log.debug("Added {}:{} as a precursor of {}:{}", + context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId()); + } + } + + inputs.addAll(messageHandler.readEntities()); + outputs.addAll(messageHandler.writeEntities()); + return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), messageHandler.getUpdatedMetadata(), tasks); + } + + private Task tableUpdateReplStateTask(String dbName, String tableName, + Map partSpec, String replState, + Task preCursor) throws SemanticException { + HashMap mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterTableDesc alterTblDesc = new AlterTableDesc( + AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState)); + alterTblDesc.setProps(mapProp); + alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName)); + alterTblDesc.setPartSpec((HashMap)partSpec); + + Task updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterTblDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private Task dbUpdateReplStateTask(String dbName, String replState, + Task preCursor) { + HashMap mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(replState, replState)); + Task updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterDbDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private List> addUpdateReplStateTasks(boolean isDatabaseLoad, + UpdatedMetaDataTracker updatedMetadata, + List> importTasks) throws SemanticException { + String replState = updatedMetadata.getReplicationState(); + String database = updatedMetadata.getDatabase(); + String table = updatedMetadata.getTable(); + + // If no import tasks generated by the event or no table updated for table level load, then no + // need to update the repl state to any object. + if (importTasks.isEmpty() || (!isDatabaseLoad && (table == null))) { + log.debug("No objects need update of repl state: Either 0 import tasks or table level load"); + return importTasks; + } + + // Create a barrier task for dependency collection of import tasks + Task barrierTask = TaskFactory.get(new DependencyCollectionWork()); + + // Link import tasks to the barrier task which will in-turn linked with repl state update tasks + for (Task t : importTasks){ + t.addDependentTask(barrierTask); + log.debug("Added {}:{} as a precursor of barrier task {}:{}", + t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); + } + + List> tasks = new ArrayList<>(); + Task updateReplIdTask; + + // If any partition is updated, then update repl state in partition object + for (final Map partSpec : updatedMetadata.getPartitions()) { + updateReplIdTask = tableUpdateReplStateTask(database, table, partSpec, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + if (table != null) { + // If any table/partition is updated, then update repl state in table object + updateReplIdTask = tableUpdateReplStateTask(database, table, null, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // For table level load, need not update replication state for the database + if (isDatabaseLoad) { + // If any table/partition is updated, then update repl state in db object + updateReplIdTask = dbUpdateReplStateTask(database, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // At least one task would have been added to update the repl state + return tasks; + } + + public static long getNumIteration() { + return numIteration; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java similarity index 91% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java index 0313058b0d..284796f695 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.exec.repl.bootstrap; +package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; @@ -28,7 +28,7 @@ public class AddDependencyToLeaves implements DAGTraversal.Function { private List> postDependencyCollectionTasks; - AddDependencyToLeaves(List> postDependencyCollectionTasks) { + public AddDependencyToLeaves(List> postDependencyCollectionTasks) { this.postDependencyCollectionTasks = postDependencyCollectionTasks; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java similarity index 98% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 18a83043a6..618be1dd5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -15,12 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.exec.repl; +package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java similarity index 90% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java index f8f0801d79..1d01bc9cd2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java @@ -15,10 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; +package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +66,16 @@ public void addTask(Task task) { updateTaskCount(task, visited); } + public void addTaskList(List > taskList) { + List > visited = new ArrayList<>(); + for (Task task : taskList) { + if (!visited.contains(task)) { + tasks.add(task); + updateTaskCount(task, visited); + } + } + } + // This method is used to traverse the DAG created in tasks list and add the dependent task to // the tail of each task chain. public void addDependentTask(Task dependent) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java index a91f45e204..cf54aa3709 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.parse.GenTezProcContext; import org.apache.hadoop.hive.ql.parse.GenTezWork; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 0a5ecf9629..0a535d15c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index f37de3e808..9131a59c21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,34 +29,18 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; -import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.repl.DumpType; -import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; -import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; -import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; -import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; -import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.AlterTableDesc; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.stats.StatsUtils; import java.io.FileNotFoundException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; + import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -266,45 +249,6 @@ private void initReplLoad(ASTNode ast) throws SemanticException { } } - private boolean isEventNotReplayed(Map params, FileStatus dir, DumpType dumpType) { - if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { - String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); - if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) { - LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName()) - + " is already replayed. LastReplId - " + Long.parseLong(replLastId)); - return false; - } - } - return true; - } - - private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException { - // if database itself is null then we can not filter out anything. - if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) { - return true; - } else if ((tblNameOrPattern == null) || (tblNameOrPattern.isEmpty())) { - Database database; - try { - database = Hive.get().getDatabase(dbNameOrPattern); - return isEventNotReplayed(database.getParameters(), dir, dumpType); - } catch (HiveException e) { - //may be the db is getting created in this load - LOG.debug("failed to get the database " + dbNameOrPattern); - return true; - } - } else { - Table tbl; - try { - tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern); - return isEventNotReplayed(tbl.getParameters(), dir, dumpType); - } catch (HiveException e) { - // may be the table is getting created in this load - LOG.debug("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern); - return true; - } - } - } - /* * Example dump dirs we need to be able to handle : * @@ -396,7 +340,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState()); + tblNameOrPattern, queryState.getLineageState(), false); rootTasks.add(TaskFactory.get(replLoadWork, conf)); return; } @@ -407,236 +351,15 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { return; } - FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs)); - - if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) { - throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString()); - } - - if (!evDump){ - // not an event dump, not a table dump - thus, a db dump - if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) { - LOG.debug("Found multiple dirs when we expected 1:"); - for (FileStatus d : dirsInLoadPath) { - LOG.debug("> " + d.getPath().toUri().toString()); - } - throw new IllegalArgumentException( - "Multiple dirs in " - + loadPath.toUri().toString() - + " does not correspond to REPL LOAD expecting to load to a singular destination point."); - } - - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - queryState.getLineageState()); - rootTasks.add(TaskFactory.get(replLoadWork, conf)); - // - // for (FileStatus dir : dirsInLoadPath) { - // analyzeDatabaseLoad(dbNameOrPattern, fs, dir); - // } - } else { - // Event dump, each sub-dir is an individual event dump. - // We need to guarantee that the directory listing we got is in order of evid. - Arrays.sort(dirsInLoadPath, new EventDumpDirComparator()); - - Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork()); - Task taskChainTail = evTaskRoot; - - ReplLogger replLogger = new IncrementalLoadLogger(dbNameOrPattern, - loadPath.toString(), dirsInLoadPath.length); - - for (FileStatus dir : dirsInLoadPath){ - String locn = dir.getPath().toUri().toString(); - DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); - - if (!shouldReplayEvent(dir, eventDmd.getDumpType())) { - continue; - } - - LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); - - // event loads will behave similar to table loads, with one crucial difference - // precursor order is strict, and each event must be processed after the previous one. - // The way we handle this strict order is as follows: - // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask) - // at the head of our event chain. For each event we process, we tell analyzeTableLoad to - // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks - // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all - // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of - // tasks as follows: - // - // --->ev1.task1-- --->ev2.task1-- - // / \ / \ - // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail - // \ / - // --->ev1.task3-- - // - // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the - // entire chain - - MessageHandler.Context context = new MessageHandler.Context(dbNameOrPattern, - tblNameOrPattern, locn, taskChainTail, - eventDmd, conf, db, ctx, LOG); - List> evTasks = analyzeEventLoad(context); - - if ((evTasks != null) && (!evTasks.isEmpty())){ - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, - dir.getPath().getName(), - eventDmd.getDumpType().toString()); - Task barrierTask = TaskFactory.get(replStateLogWork); - for (Task t : evTasks){ - t.addDependentTask(barrierTask); - LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", - t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); - } - LOG.debug("Updated taskChainTail from {}:{} to {}:{}", - taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); - taskChainTail = barrierTask; - } - } - - // If any event is there and db name is known, then dump the start and end logs - if (!evTaskRoot.equals(taskChainTail)) { - Map dbProps = new HashMap<>(); - dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(dmd.getEventTo())); - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); - Task barrierTask = TaskFactory.get(replStateLogWork, conf); - taskChainTail.addDependentTask(barrierTask); - LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", - taskChainTail.getClass(), taskChainTail.getId(), - barrierTask.getClass(), barrierTask.getId()); - - replLogger.startLog(); - } - rootTasks.add(evTaskRoot); - } - + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, + tblNameOrPattern, queryState.getLineageState(), evDump); + rootTasks.add(TaskFactory.get(replLoadWork, conf)); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e.getMessage(), e); } } - private List> analyzeEventLoad( - MessageHandler.Context context) - throws SemanticException { - MessageHandler messageHandler = context.dmd.getDumpType().handler(); - List> tasks = messageHandler.handle(context); - - if (context.precursor != null) { - for (Task t : tasks) { - context.precursor.addDependentTask(t); - LOG.debug("Added {}:{} as a precursor of {}:{}", - context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId()); - } - } - - inputs.addAll(messageHandler.readEntities()); - outputs.addAll(messageHandler.writeEntities()); - return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), - messageHandler.getUpdatedMetadata(), tasks); - } - - private Task tableUpdateReplStateTask( - String dbName, - String tableName, - Map partSpec, - String replState, - Task preCursor) throws SemanticException { - HashMap mapProp = new HashMap<>(); - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); - - AlterTableDesc alterTblDesc = new AlterTableDesc( - AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState)); - alterTblDesc.setProps(mapProp); - alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName)); - alterTblDesc.setPartSpec((HashMap)partSpec); - - Task updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterTblDesc), conf); - - // Link the update repl state task with dependency collection task - if (preCursor != null) { - preCursor.addDependentTask(updateReplIdTask); - LOG.debug("Added {}:{} as a precursor of {}:{}", - preCursor.getClass(), preCursor.getId(), - updateReplIdTask.getClass(), updateReplIdTask.getId()); - } - return updateReplIdTask; - } - - private Task dbUpdateReplStateTask( - String dbName, - String replState, - Task preCursor) { - HashMap mapProp = new HashMap<>(); - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); - - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc( - dbName, mapProp, new ReplicationSpec(replState, replState)); - Task updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterDbDesc), conf); - - // Link the update repl state task with dependency collection task - if (preCursor != null) { - preCursor.addDependentTask(updateReplIdTask); - LOG.debug("Added {}:{} as a precursor of {}:{}", - preCursor.getClass(), preCursor.getId(), - updateReplIdTask.getClass(), updateReplIdTask.getId()); - } - return updateReplIdTask; - } - - private List> addUpdateReplStateTasks( - boolean isDatabaseLoad, - UpdatedMetaDataTracker updatedMetadata, - List> importTasks) throws SemanticException { - String replState = updatedMetadata.getReplicationState(); - String dbName = updatedMetadata.getDatabase(); - String tableName = updatedMetadata.getTable(); - - // If no import tasks generated by the event or no table updated for table level load, then no - // need to update the repl state to any object. - if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) { - LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load"); - return importTasks; - } - - // Create a barrier task for dependency collection of import tasks - Task barrierTask = TaskFactory.get(new DependencyCollectionWork()); - - // Link import tasks to the barrier task which will in-turn linked with repl state update tasks - for (Task t : importTasks){ - t.addDependentTask(barrierTask); - LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", - t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); - } - - List> tasks = new ArrayList<>(); - Task updateReplIdTask; - - // If any partition is updated, then update repl state in partition object - for (final Map partSpec : updatedMetadata.getPartitions()) { - updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); - tasks.add(updateReplIdTask); - } - - if (tableName != null) { - // If any table/partition is updated, then update repl state in table object - updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask); - tasks.add(updateReplIdTask); - } - - // For table level load, need not update replication state for the database - if (isDatabaseLoad) { - // If any table/partition is updated, then update repl state in db object - updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); - tasks.add(updateReplIdTask); - } - - // At least one task would have been added to update the repl state - return tasks; - } - // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java index 9fdf74258d..ecde3ceaab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java @@ -19,7 +19,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.thrift.TException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 70f4fed71a..f05c23114a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java index b59cdf2b54..e68e0550eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 939884d0a4..4a2fdd243d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -20,7 +20,7 @@ import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DDLWork; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java index 309debe7d2..166cf874d7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.junit.Test; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java index 32b4a72b5a..41ab447de8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock;