diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 14235b5719..4e989792e7 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.conf; import com.google.common.base.Joiner; - import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.FileUtils; @@ -37,7 +36,6 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Shell; import org.apache.hive.common.HiveCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -457,6 +455,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Inteval for cmroot cleanup thread."), REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/", "Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"), + REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 1000, + "Provide and approximate of the max number of tasks that should be executed in before \n" + + "dynamically generating the next set of tasks. The number is an approximate as we \n" + + "will stop at slightly higher number than above, the reason being some events might \n" + + "lead to an task increment that would cross the above limit"), + LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index c431537918..2af728fdc5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -22,6 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.junit.AfterClass; @@ -89,7 +90,7 @@ public void testCreateFunctionIncrementalReplication() throws Throwable { WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(bootStrapDump.lastReplicationId); + .verifyResult(bootStrapDump.lastReplicationId); primary.run("CREATE FUNCTION " + primaryDbName + ".testFunction as 'hivemall.tools.string.StopwordUDF' " @@ -99,16 +100,16 @@ public void testCreateFunctionIncrementalReplication() throws Throwable { primary.dump(primaryDbName, bootStrapDump.lastReplicationId); replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(incrementalDump.lastReplicationId) + .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verify(replicatedDbName + ".testFunction"); + .verifyResult(replicatedDbName + ".testFunction"); // Test the idempotent behavior of CREATE FUNCTION replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(incrementalDump.lastReplicationId) + .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verify(replicatedDbName + ".testFunction"); + .verifyResult(replicatedDbName + ".testFunction"); } @Test @@ -119,7 +120,7 @@ public void testDropFunctionIncrementalReplication() throws Throwable { WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(bootStrapDump.lastReplicationId); + .verifyResult(bootStrapDump.lastReplicationId); primary.run("Drop FUNCTION " + primaryDbName + ".testFunction "); @@ -127,16 +128,16 @@ public void testDropFunctionIncrementalReplication() throws Throwable { primary.dump(primaryDbName, bootStrapDump.lastReplicationId); replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(incrementalDump.lastReplicationId) + .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '*testfunction*'") - .verify(null); + .verifyResult(null); // Test the idempotent behavior of DROP FUNCTION replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(incrementalDump.lastReplicationId) + .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '*testfunction*'") - .verify(null); + .verifyResult(null); } @Test @@ -148,7 +149,7 @@ public void testBootstrapFunctionReplication() throws Throwable { replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verify(replicatedDbName + ".testFunction"); + .verifyResult(replicatedDbName + ".testFunction"); } @Test @@ -164,7 +165,7 @@ public void testCreateFunctionWithFunctionBinaryJarsOnHDFS() throws Throwable { replica.load(replicatedDbName, tuple.dumpLocation) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verify(replicatedDbName + ".anotherFunction"); + .verifyResult(replicatedDbName + ".anotherFunction"); FileStatus[] fileStatuses = replica.miniDFSCluster.getFileSystem().globStatus( new Path( @@ -218,4 +219,32 @@ private Dependencies dependencies(String ivyPath, WarehouseInstance onWarehouse) }).collect(Collectors.toList()); return new Dependencies(collect); } + + /* + From the hive logs(hive.log) we can also check for the info statement + fgrep "Total Tasks" [location of hive.log] + each line indicates one run of loadTask. + */ + @Test + public void testMultipleStagesOfReplicationLoadTask() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='india') values ('mumbai')") + .run("insert into table t2 partition(country='india') values ('delhi')") + .run("create table t3 (rank int)") + .dump(primaryDbName, null); + + // each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs. + replica.hiveConf.setIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, 1); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "t1", "t2", "t3" }) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 1128eae6f9..c2d8d85e2c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -56,7 +56,7 @@ Licensed to the Apache Software Foundation (ASF) under one final String functionsRoot; private Logger logger; private Driver driver; - private HiveConf hiveConf; + HiveConf hiveConf; MiniDFSCluster miniDFSCluster; private HiveMetaStoreClient client; @@ -71,12 +71,13 @@ Licensed to the Apache Software Foundation (ASF) under one assert miniDFSCluster.isDataNodeUp(); DistributedFileSystem fs = miniDFSCluster.getFileSystem(); + Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier); Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier); this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString(); - initialize(cmRootPath.toString()); + initialize(cmRootPath.toString(), warehouseRoot.toString()); } - private void initialize(String cmRoot) throws Exception { + private void initialize(String cmRoot, String warehouseRoot) throws Exception { hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname); String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp") @@ -90,6 +91,7 @@ private void initialize(String cmRoot) throws Exception { } // turn on db notification listener on meta store + hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseRoot); hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS); hiveConf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); @@ -176,7 +178,7 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Thro return this; } - WarehouseInstance verify(String data) throws IOException { + WarehouseInstance verifyResult(String data) throws IOException { verifyResults(data == null ? new String[] {} : new String[] { data }); return this; } @@ -188,7 +190,7 @@ WarehouseInstance verify(String data) throws IOException { * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case * before assert. */ - private void verifyResults(String[] data) throws IOException { + WarehouseInstance verifyResults(String[] data) throws IOException { List results = getOutput(); logger.info("Expecting {}", StringUtils.join(data, ",")); logger.info("Got {}", results); @@ -196,6 +198,7 @@ private void verifyResults(String[] data) throws IOException { for (int i = 0; i < data.length; i++) { assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); } + return this; } List getOutput() throws IOException { diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index dc55805221..b50e88365e 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -101,6 +101,7 @@ enum StageType { DEPENDENCY_COLLECTION, COLUMNSTATS, REPLDUMP, + REPL_BOOTSTRAP_LOAD, } struct Stage { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index 7254d504fe..486c0df78c 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -116,7 +116,8 @@ int _kStageTypeValues[] = { StageType::STATS, StageType::DEPENDENCY_COLLECTION, StageType::COLUMNSTATS, - StageType::REPLDUMP + StageType::REPLDUMP, + StageType::REPL_BOOTSTRAP_LOAD }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -131,9 +132,10 @@ const char* _kStageTypeNames[] = { "STATS", "DEPENDENCY_COLLECTION", "COLUMNSTATS", - "REPLDUMP" + "REPLDUMP", + "REPL_BOOTSTRAP_LOAD" }; -const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(13, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(14, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index 38d054b119..5b093ce889 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -94,7 +94,8 @@ struct StageType { STATS = 9, DEPENDENCY_COLLECTION = 10, COLUMNSTATS = 11, - REPLDUMP = 12 + REPLDUMP = 12, + REPL_BOOTSTRAP_LOAD = 13 }; }; diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index deca5745d8..7902bc5b5a 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -24,7 +24,8 @@ STATS(9), DEPENDENCY_COLLECTION(10), COLUMNSTATS(11), - REPLDUMP(12); + REPLDUMP(12), + REPL_BOOTSTRAP_LOAD(13); private final int value; @@ -71,6 +72,8 @@ public static StageType findByValue(int value) { return COLUMNSTATS; case 12: return REPLDUMP; + case 13: + return REPL_BOOTSTRAP_LOAD; default: return null; } diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index 4d902ee8a1..def82ea30e 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -115,6 +115,7 @@ final class StageType { const DEPENDENCY_COLLECTION = 10; const COLUMNSTATS = 11; const REPLDUMP = 12; + const REPL_BOOTSTRAP_LOAD = 13; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -129,6 +130,7 @@ final class StageType { 10 => 'DEPENDENCY_COLLECTION', 11 => 'COLUMNSTATS', 12 => 'REPLDUMP', + 13 => 'REPL_BOOTSTRAP_LOAD', ); } diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 9e29129896..837ef744f0 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -161,6 +161,7 @@ class StageType: DEPENDENCY_COLLECTION = 10 COLUMNSTATS = 11 REPLDUMP = 12 + REPL_BOOTSTRAP_LOAD = 13 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -176,6 +177,7 @@ class StageType: 10: "DEPENDENCY_COLLECTION", 11: "COLUMNSTATS", 12: "REPLDUMP", + 13: "REPL_BOOTSTRAP_LOAD", } _NAMES_TO_VALUES = { @@ -192,6 +194,7 @@ class StageType: "DEPENDENCY_COLLECTION": 10, "COLUMNSTATS": 11, "REPLDUMP": 12, + "REPL_BOOTSTRAP_LOAD": 13, } diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index 1433d4a862..3b10499673 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -73,8 +73,9 @@ module StageType DEPENDENCY_COLLECTION = 10 COLUMNSTATS = 11 REPLDUMP = 12 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPLDUMP"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPLDUMP]).freeze + REPL_BOOTSTRAP_LOAD = 13 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPLDUMP", 13 => "REPL_BOOTSTRAP_LOAD"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPLDUMP, REPL_BOOTSTRAP_LOAD]).freeze end class Adjacency diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index fdcf0522ac..9183edf067 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -561,7 +561,7 @@ private String nextPathId() { private static final String MR_PREFIX = "-mr-"; - private static final String EXT_PREFIX = "-ext-"; + public static final String EXT_PREFIX = "-ext-"; private static final String LOCAL_PREFIX = "-local-"; /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 94d6c5a327..91ac4bf985 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask; @@ -111,6 +113,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple(TezWork.class, TezTask.class)); taskvec.add(new TaskTuple(SparkWork.class, SparkTask.class)); taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); + taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); } private static ThreadLocal tid = new ThreadLocal() { @@ -149,6 +152,7 @@ public static void resetId() { throw new RuntimeException("No task for work class " + workClass.getName()); } + @SafeVarargs public static Task get(T work, HiveConf conf, Task... tasklist) { Task ret = get((Class) work.getClass(), conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f9bdff8381..ff522c5d63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -63,7 +63,7 @@ Licensed to the Apache Software Foundation (ASF) under one public class ReplDumpTask extends Task implements Serializable { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; - private static final String FUNCTION_METADATA_DIR_NAME = "_metadata"; + private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; private final static String TMP_TABLE_PREFIX = SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase(); @@ -91,6 +91,7 @@ protected int execute(DriverContext driverContext) { prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema); } catch (Exception e) { LOG.error("failed", e); + setException(e); return 1; } return 0; @@ -311,9 +312,9 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception continue; } Path functionRoot = new Path(functionsRoot, functionName); - Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME); + Path functionMetadataFile = new Path(functionRoot, FUNCTION_METADATA_FILE_NAME); try (JsonWriter jsonWriter = - new JsonWriter(functionMetadataRoot.getFileSystem(conf), functionMetadataRoot)) { + new JsonWriter(functionMetadataFile.getFileSystem(conf), functionMetadataFile)) { FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf); serializer.writeTo(jsonWriter, tuple.replicationSpec); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java new file mode 100644 index 0000000000..8ee089c6dd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -0,0 +1,251 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.plan.api.StageType; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; + +public class ReplLoadTask extends Task implements Serializable { + private final static int ZERO_TASKS = 0; + + @Override + public String getName() { + return "BOOTSTRAP_LOAD"; + } + + /** + * Provides the root Tasks created as a result of this loadTask run which will be executed + * by the driver. It does not track details across multiple runs of LoadTask. + */ + private static class Scope { + boolean database = false, table = false, partition = false; + List> rootTasks = new ArrayList<>(); + } + + @Override + protected int execute(DriverContext driverContext) { + try { + int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); + Context context = new Context(conf, getHive()); + TaskTracker loadTaskTracker = new TaskTracker(maxTasks); + /* + for now for simplicity we are doing just one directory ( one database ), come back to use + of multiple databases once we have the basic flow to chain creating of tasks in place for + a database ( directory ) + */ + BootstrapEventsIterator iterator = work.iterator(); + /* + This is used to get hold of a reference during the current creation of tasks and is initialized + with "0" tasks such that it will be non consequential in any operations done with task tracker + compositions. + */ + TaskTracker dbTracker = new TaskTracker(ZERO_TASKS); + TaskTracker tableTracker = new TaskTracker(ZERO_TASKS); + Scope scope = new Scope(); + while (iterator.hasNext() && loadTaskTracker.canAddMoreTasks()) { + BootstrapEvent next = iterator.next(); + switch (next.eventType()) { + case Database: + DatabaseEvent dbEvent = (DatabaseEvent) next; + dbTracker = + new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker) + .tasks(); + loadTaskTracker.update(dbTracker); + if (work.hasDbState()) { + loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); + } + work.updateDbEventState(dbEvent.toState()); + scope.database = true; + scope.rootTasks.addAll(dbTracker.tasks()); + dbTracker.debugLog("database"); + break; + case Table: { + /* + Implicit assumption here is that database level is processed first before table level, + which will depend on the iterator used since it should provide the higher level directory + listing before providing the lower level listing. This is also required such that + the dbTracker / tableTracker are setup correctly always. + */ + TableContext tableContext = + new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn); + TableEvent tableEvent = (TableEvent) next; + LoadTable loadTable = new LoadTable(tableEvent, context, tableContext); + tableTracker = loadTable.tasks(); + if (!scope.database) { + scope.rootTasks.addAll(tableTracker.tasks()); + scope.table = true; + } + setUpDependencies(dbTracker, tableTracker); + /* + for table replication if we reach the max number of tasks then for the next run we will + try to reload the same table again, this is mainly for ease of understanding the code + as then we can avoid handling == > loading partitions for the table given that + the creation of table lead to reaching max tasks vs, loading next table since current + one does not have partitions. + */ + + // for a table we explicitly try to load partitions as there is no separate partitions events. + LoadPartitions loadPartitions = + new LoadPartitions(context, tableTracker, tableEvent, work.dbNameToLoadIn, + tableContext); + TaskTracker partitionsTracker = loadPartitions.tasks(); + partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, + partitionsTracker); + tableTracker.debugLog("table"); + partitionsTracker.debugLog("partitions for table"); + break; + } + case Partition: { + /* + This will happen only when loading tables and we reach the limit of number of tasks we can create; + hence we know here that the table should exist and there should be a lastPartitionName + */ + PartitionEvent event = (PartitionEvent) next; + TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn, + work.tableNameToLoadIn); + LoadPartitions loadPartitions = + new LoadPartitions(context, tableContext, loadTaskTracker, event.asTableEvent(), + work.dbNameToLoadIn, + event.lastPartitionReplicated()); + /* + the tableTracker here should be a new instance and not an existing one as this can + only happen when we break in between loading partitions. + */ + TaskTracker partitionsTracker = loadPartitions.tasks(); + partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, + partitionsTracker); + partitionsTracker.debugLog("partitions"); + break; + } + case Function: { + LoadFunction loadFunction = + new LoadFunction(context, (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); + TaskTracker functionsTracker = loadFunction.tasks(); + if (!scope.database) { + scope.rootTasks.addAll(functionsTracker.tasks()); + } else { + setUpDependencies(dbTracker, functionsTracker); + } + loadTaskTracker.update(functionsTracker); + functionsTracker.debugLog("functions"); + break; + } + } + } + boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState(); + createBuilderTask(scope.rootTasks, addAnotherLoadTask); + if (!iterator.hasNext()) { + loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); + } + this.childTasks = scope.rootTasks; + LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); + } catch (Exception e) { + LOG.error("failed replication", e); + setException(e); + return 1; + } + LOG.info("completed load task run : {}", work.executedLoadTask()); + return 0; + } + + /** + * There was a database update done before and we want to make sure we update the last repl + * id on this database as we are now going to switch to processing a new database. + */ + private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope) + throws SemanticException { + /* + we don't want to put any limits on this task as this is essential before we start + processing new database events. + */ + TaskTracker taskTracker = + new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn, + new TaskTracker(maxTasks)).tasks(); + scope.rootTasks.addAll(taskTracker.tasks()); + return taskTracker; + } + + private void partitionsPostProcessing(BootstrapEventsIterator iterator, + Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker, + TaskTracker partitionsTracker) throws SemanticException { + setUpDependencies(tableTracker, partitionsTracker); + if (!scope.database && !scope.table) { + scope.rootTasks.addAll(partitionsTracker.tasks()); + scope.partition = true; + } + loadTaskTracker.update(tableTracker); + loadTaskTracker.update(partitionsTracker); + if (partitionsTracker.hasReplicationState()) { + iterator.setReplicationState(partitionsTracker.replicationState()); + } + } + + /* + This sets up dependencies such that a child task is dependant on the parent to be complete. + */ + private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) { + for (Task parentTask : parentTasks.tasks()) { + for (Task childTask : childTasks.tasks()) { + parentTask.addDependentTask(childTask); + } + } + } + + private void createBuilderTask(List> rootTasks, + boolean shouldCreateAnotherLoadTask) { + /* + do a depth first traversal here to set the dependency collection task as the final task + followed by the next load task so that the chain continues forward, this additional traversal + might not be required if we pass the dependency collection task as a context member which + then can be set explicitly on all tasks created further down, and we can just add the load task + if there is a replication state present. + */ + Task dependencyTask = TaskFactory.get(new DependencyCollectionWork(), conf); + setupDependencyTask(rootTasks, dependencyTask); + + if (shouldCreateAnotherLoadTask) { + Task loadTask = TaskFactory.get(work, conf); + dependencyTask.addDependentTask(loadTask); + } + } + + private void setupDependencyTask(List> tasks, + Task dependencyTask) { + if (tasks != null && !tasks.isEmpty()) { + for (Task task : tasks) { + if(task.equals(dependencyTask)) { + continue; + } + task.addDependentTask(dependencyTask); + setupDependencyTask(task.getChildTasks(), dependencyTask); + } + } + } + + @Override + public StageType getType() { + return StageType.REPL_BOOTSTRAP_LOAD; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java new file mode 100644 index 0000000000..46a1f7f18f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -0,0 +1,54 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.plan.Explain; + +import java.io.IOException; +import java.io.Serializable; + +@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER, + Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) +public class ReplLoadWork implements Serializable { + final String dbNameToLoadIn; + final String tableNameToLoadIn; + private final BootstrapEventsIterator iterator; + private int loadTaskRunCount = 0; + private DatabaseEvent.State state = null; + + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, + String tableNameToLoadIn) throws IOException { + this.tableNameToLoadIn = tableNameToLoadIn; + this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf); + this.dbNameToLoadIn = dbNameToLoadIn; + } + + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern) + throws IOException { + this(hiveConf, dumpDirectory, dbNameOrPattern, null); + } + + public BootstrapEventsIterator iterator() { + return iterator; + } + + int executedLoadTask() { + return ++loadTaskRunCount; + } + + void updateDbEventState(DatabaseEvent.State state) { + this.state = state; + } + + DatabaseEvent databaseEvent(HiveConf hiveConf) { + DatabaseEvent databaseEvent = state.toEvent(hiveConf); + state = null; + return databaseEvent; + } + + boolean hasDbState() { + return state != null; + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java new file mode 100644 index 0000000000..bc693f6151 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java @@ -0,0 +1,11 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +public interface BootstrapEvent { + + EventType eventType(); + + enum EventType { + Database, Table, Function, Partition + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java new file mode 100644 index 0000000000..2828b2cfdd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.Serializable; + +public interface DatabaseEvent extends BootstrapEvent { + Database dbInMetadata(String dbNameToOverride) throws SemanticException; + + State toState(); + + interface State extends Serializable { + DatabaseEvent toEvent(HiveConf hiveConf); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java new file mode 100644 index 0000000000..bce12e010f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.fs.Path; + +/** + * Exposing the FileSystem implementation outside which is what it should NOT do. + *

+ * Since the bootstrap and incremental for functions is handled similarly. There + * is additional work to make sure we pass the event object from both places. + * + * @see org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler.FunctionDescBuilder + * would be merged here mostly. + */ +public interface FunctionEvent extends BootstrapEvent { + Path rootDir(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java new file mode 100644 index 0000000000..29c190ba6c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java @@ -0,0 +1,7 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +public interface PartitionEvent extends TableEvent { + String lastPartitionReplicated(); + + TableEvent asTableEvent(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java new file mode 100644 index 0000000000..02844fd465 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java @@ -0,0 +1,25 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; + +import java.util.List; + +public interface TableEvent extends BootstrapEvent { + ImportTableDesc tableDesc(String dbName) throws SemanticException; + + List partitionDescriptions(ImportTableDesc tblDesc) + throws SemanticException; + + ReplicationSpec replicationSpec(); + + boolean shouldNotReplicate(); + + /** + * Exposing the FileSystem implementation outside which is what it should NOT do. + */ + Path metadataPath(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java new file mode 100644 index 0000000000..8228b59fe2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -0,0 +1,116 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Replication layout is from the root directory of replication Dump is + * db + * table1 + * _metadata + * data + * _files + * table2 + * _metadata + * data + * _files + * _functions + * functionName1 + * _metadata + * functionName2 + * _metadata + * this class understands this layout and hence will help in identifying for subsequent bootstrap tasks + * as to where the last set of tasks left execution and from where this task should pick up replication. + * Since for replication we have the need for hierarchy of tasks we need to make sure that db level are + * processed first before table, table level are processed first before partitions etc. + * + * Based on how the metadata is being exported on the file we have to currently take care of the following: + * 1. Make sure db level are processed first as this will be required before table / functions processing. + * 2. Table before partition is not explicitly required as table and partition metadata are in the same file. + * + * + * For future integrations other sources of events like kafka, would require to implement an Iterator + * + */ +public class BootstrapEventsIterator implements Iterator { + private DatabaseEventsIterator currentDatabaseIterator = null; + /* + This denotes listing of any directories where during replication we want to take care of + db level operations first, namely in our case its only during db creation on the replica + warehouse. + */ + private Iterator dbEventsIterator; + + public BootstrapEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { + Path path = new Path(dumpDirectory); + FileSystem fileSystem = path.getFileSystem(hiveConf); + FileStatus[] fileStatuses = + fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem)); + + List dbsToCreate = Arrays.stream(fileStatuses).filter(f -> { + Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME); + try { + return fileSystem.exists(metadataPath); + } catch (IOException e) { + throw new RuntimeException("could not determine if exists : " + metadataPath.toString(), e); + } + }).collect(Collectors.toList()); + dbEventsIterator = dbsToCreate.stream().map(f -> { + try { + return new DatabaseEventsIterator(f.getPath(), hiveConf); + } catch (IOException e) { + throw new RuntimeException( + "Error while creating event iterator for db at path" + f.getPath().toString(), e); + } + }).collect(Collectors.toList()).iterator(); + + } + + @Override + public boolean hasNext() { + while (true) { + if (currentDatabaseIterator == null) { + if (dbEventsIterator.hasNext()) { + currentDatabaseIterator = dbEventsIterator.next(); + } else { + return false; + } + } else if (currentDatabaseIterator.hasNext()) { + return true; + } else { + currentDatabaseIterator = null; + } + } + } + + @Override + public BootstrapEvent next() { + return currentDatabaseIterator.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("This operation is not supported"); + } + + @Override + public void forEachRemaining(Consumer action) { + throw new UnsupportedOperationException("This operation is not supported"); + } + + public void setReplicationState(ReplicationState replicationState) { + this.currentDatabaseIterator.replicationState = replicationState; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java new file mode 100644 index 0000000000..2edd9652d4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -0,0 +1,124 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME; + +class DatabaseEventsIterator implements Iterator { + private static Logger LOG = LoggerFactory.getLogger(DatabaseEventsIterator.class); + private RemoteIterator remoteIterator; + + private final Path dbLevelPath; + private HiveConf hiveConf; + ReplicationState replicationState; + private Path next = null, previous = null; + private boolean databaseEventProcessed = false; + + DatabaseEventsIterator(Path dbLevelPath, HiveConf hiveConf) throws IOException { + this.dbLevelPath = dbLevelPath; + this.hiveConf = hiveConf; + FileSystem fileSystem = dbLevelPath.getFileSystem(hiveConf); + // this is only there for the use case where we are doing table only replication and not database level + if (!fileSystem.exists(new Path(dbLevelPath + Path.SEPARATOR + EximUtil.METADATA_NAME))) { + databaseEventProcessed = true; + } + remoteIterator = fileSystem.listFiles(dbLevelPath, true); + } + + @Override + public boolean hasNext() { + try { + if (!databaseEventProcessed) { + next = dbLevelPath; + return true; + } + + if (replicationState == null && next == null) { + while (remoteIterator.hasNext()) { + LocatedFileStatus next = remoteIterator.next(); + if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) { + String replacedString = next.getPath().toString().replace(dbLevelPath.toString(), ""); + List filteredNames = Arrays.stream(replacedString.split(Path.SEPARATOR)) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toList()); + if (filteredNames.size() == 1) { + // this relates to db level event tracked via databaseEventProcessed + } else { + this.next = next.getPath().getParent(); + return true; + } + } + } + return false; + } + return true; + } catch (Exception e) { + // may be do some retry logic here. + throw new RuntimeException("could not traverse the file via remote iterator " + dbLevelPath, + e); + } + } + + /* + we handle three types of scenarios with special case. + 1. handling of db Level _metadata + 2. handling of subsequent loadTask which will start running from the previous replicationState + 3. other events : these can only be either table / function _metadata. + */ + @Override + public BootstrapEvent next() { + if (!databaseEventProcessed) { + FSDatabaseEvent event = new FSDatabaseEvent(hiveConf, next.toString()); + databaseEventProcessed = true; + return postProcessing(event); + } + + if (replicationState != null) { + return eventForReplicationState(); + } + + String currentPath = next.toString(); + if (currentPath.contains(FUNCTIONS_ROOT_DIR_NAME)) { + LOG.debug("functions directory: {}", next.toString()); + return postProcessing(new FSFunctionEvent(next)); + } + return postProcessing(new FSTableEvent(hiveConf, next.toString())); + } + + private BootstrapEvent postProcessing(BootstrapEvent bootstrapEvent) { + previous = next; + next = null; + LOG.debug("processing " + previous); + return bootstrapEvent; + } + + private BootstrapEvent eventForReplicationState() { + if (replicationState.partitionState != null) { + BootstrapEvent + bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(), replicationState); + replicationState = null; + return bootstrapEvent; + } else if (replicationState.lastTableReplicated != null) { + FSTableEvent event = new FSTableEvent(hiveConf, previous.toString()); + replicationState = null; + return event; + } + throw new IllegalStateException("for replicationState " + replicationState.toString()); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java new file mode 100644 index 0000000000..5692bfae89 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java @@ -0,0 +1,71 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; + +public class FSDatabaseEvent implements DatabaseEvent { + + private final Path dbMetadataFile; + private final FileSystem fileSystem; + + FSDatabaseEvent(HiveConf hiveConf, String dbDumpDirectory) { + try { + this.dbMetadataFile = new Path(dbDumpDirectory, EximUtil.METADATA_NAME); + this.fileSystem = dbMetadataFile.getFileSystem(hiveConf); + } catch (Exception e) { + String message = "Error while identifying the filesystem for db " + + "metadata file in " + dbDumpDirectory; + throw new RuntimeException(message, e); + } + } + + @Override + public Database dbInMetadata(String dbNameToOverride) throws SemanticException { + try { + MetaData rv = EximUtil.readMetaData(fileSystem, dbMetadataFile); + Database dbObj = rv.getDatabase(); + if (dbObj == null) { + throw new IllegalArgumentException( + "_metadata file read did not contain a db object - invalid dump."); + } + + // override the db name if provided in repl load command + if (StringUtils.isNotBlank(dbNameToOverride)) { + dbObj.setName(dbNameToOverride); + } + return dbObj; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + @Override + public State toState() { + return new FSDBState(dbMetadataFile.getParent().toString()); + } + + @Override + public EventType eventType() { + return EventType.Database; + } + + static class FSDBState implements DatabaseEvent.State { + final String dbDumpDirectory; + + FSDBState(String dbDumpDirectory) { + this.dbDumpDirectory = dbDumpDirectory; + } + + @Override + public DatabaseEvent toEvent(HiveConf hiveConf) { + return new FSDatabaseEvent(hiveConf, dbDumpDirectory); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java new file mode 100644 index 0000000000..abf2693341 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java @@ -0,0 +1,22 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; + +public class FSFunctionEvent implements FunctionEvent { + private final Path rootDir; + + FSFunctionEvent(Path rootDir) { + this.rootDir = rootDir; + } + + @Override + public Path rootDir() { + return rootDir; + } + + @Override + public EventType eventType() { + return EventType.Function; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java new file mode 100644 index 0000000000..8e818856c1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java @@ -0,0 +1,67 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; + +import java.util.List; + +public class FSPartitionEvent implements PartitionEvent { + + private final ReplicationState replicationState; + private final TableEvent tableEvent; + + FSPartitionEvent(HiveConf hiveConf, String metadataDir, + ReplicationState replicationState) { + tableEvent = new FSTableEvent(hiveConf, metadataDir); + this.replicationState = replicationState; + } + + @Override + public EventType eventType() { + return EventType.Partition; + } + + @Override + public String lastPartitionReplicated() { + assert replicationState != null && replicationState.partitionState != null; + return replicationState.partitionState.lastPartitionReplicated; + } + + @Override + public TableEvent asTableEvent() { + return tableEvent; + } + + @Override + public ImportTableDesc tableDesc(String dbName) throws SemanticException { + return tableEvent.tableDesc(dbName); + } + + @Override + public List partitionDescriptions(ImportTableDesc tblDesc) + throws SemanticException { + return tableEvent.partitionDescriptions(tblDesc); + } + + @Override + public ReplicationSpec replicationSpec() { + return tableEvent.replicationSpec(); + } + + @Override + public boolean shouldNotReplicate() { + return tableEvent.shouldNotReplicate(); + } + + @Override + public Path metadataPath() { + return tableEvent.metadataPath(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java new file mode 100644 index 0000000000..6b7eaf017e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +public class FSTableEvent implements TableEvent { + private final Path fromPath; + private final MetaData metadata; + + FSTableEvent(HiveConf hiveConf, String metadataDir) { + try { + URI fromURI = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(metadataDir)); + fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + FileSystem fs = FileSystem.get(fromURI, hiveConf); + metadata = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public boolean shouldNotReplicate() { + ReplicationSpec spec = metadata.getReplicationSpec(); + return spec.isNoop() || !spec.isInReplicationScope(); + } + + @Override + public Path metadataPath() { + return fromPath; + } + + @Override + public ImportTableDesc tableDesc(String dbName) throws SemanticException { + try { + Table table = new Table(metadata.getTable()); + ImportTableDesc tableDesc = new ImportTableDesc(dbName, table); + tableDesc.setReplicationSpec(metadata.getReplicationSpec()); + return tableDesc; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + @Override + public List partitionDescriptions(ImportTableDesc tblDesc) + throws SemanticException { + List descs = new ArrayList<>(); + //TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose. + for (Partition partition : metadata.getPartitions()) { + // TODO: this should ideally not create AddPartitionDesc per partition + AddPartitionDesc partsDesc = partitionDesc(fromPath, tblDesc, partition); + descs.add(partsDesc); + } + return descs; + } + + private AddPartitionDesc partitionDesc(Path fromPath, + ImportTableDesc tblDesc, Partition partition) throws SemanticException { + try { + AddPartitionDesc partsDesc = + new AddPartitionDesc(tblDesc.getDatabaseName(), tblDesc.getTableName(), + EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), + partition.getSd().getLocation(), partition.getParameters()); + AddPartitionDesc.OnePartitionDesc partDesc = partsDesc.getPartition(0); + partDesc.setInputFormat(partition.getSd().getInputFormat()); + partDesc.setOutputFormat(partition.getSd().getOutputFormat()); + partDesc.setNumBuckets(partition.getSd().getNumBuckets()); + partDesc.setCols(partition.getSd().getCols()); + partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib()); + partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters()); + partDesc.setBucketCols(partition.getSd().getBucketCols()); + partDesc.setSortCols(partition.getSd().getSortCols()); + partDesc.setLocation(new Path(fromPath, + Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); + partsDesc.setReplicationSpec(metadata.getReplicationSpec()); + return partsDesc; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + @Override + public ReplicationSpec replicationSpec() { + return metadata.getReplicationSpec(); + } + + @Override + public EventType eventType() { + return EventType.Table; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java new file mode 100644 index 0000000000..9480b0bac0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -0,0 +1,112 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +public class LoadDatabase { + + final Context context; + final TaskTracker tracker; + + private final DatabaseEvent event; + private final String dbNameToLoadIn; + + public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, + TaskTracker loadTaskTracker) { + this.context = context; + this.event = event; + this.dbNameToLoadIn = dbNameToLoadIn; + this.tracker = new TaskTracker(loadTaskTracker); + } + + public TaskTracker tasks() throws SemanticException { + try { + Database dbInMetadata = readDbMetadata(); + Task dbRootTask = existEmptyDb(dbInMetadata.getName()) + ? alterDbTask(dbInMetadata, context.hiveConf) + : createDbTask(dbInMetadata); + tracker.addTask(dbRootTask); + return tracker; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + Database readDbMetadata() throws SemanticException { + return event.dbInMetadata(dbNameToLoadIn); + } + + private Task createDbTask(Database dbObj) { + CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); + createDbDesc.setName(dbObj.getName()); + createDbDesc.setComment(dbObj.getDescription()); + + /* + explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going + to run multiple times and explicit logic is in place which prevents updates to tables when db level + last repl id is set and we create a AlterDatabaseTask at the end of processing a database. + */ + Map parameters = new HashMap<>(dbObj.getParameters()); + parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + createDbDesc.setDatabaseProperties(parameters); + // note that we do not set location - for repl load, we want that auto-created. + createDbDesc.setIfNotExists(false); + // If it exists, we want this to be an error condition. Repl Load is not intended to replace a + // db. + // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on. + DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), createDbDesc); + return TaskFactory.get(work, context.hiveConf); + } + + private static Task alterDbTask(Database dbObj, HiveConf hiveConf) { + AlterDatabaseDesc alterDbDesc = + new AlterDatabaseDesc(dbObj.getName(), dbObj.getParameters(), null); + DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc); + return TaskFactory.get(work, hiveConf); + } + + private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException { + Database db = context.hiveDb.getDatabase(dbName); + if (db == null) { + return false; + } + List allTables = context.hiveDb.getAllTables(dbName); + List allFunctions = context.hiveDb.getFunctions(dbName, "*"); + if (allTables.isEmpty() && allFunctions.isEmpty()) { + return true; + } + throw new InvalidOperationException( + "Database " + db.getName() + " is not empty. One or more tables/functions exist."); + } + + public static class AlterDatabase extends LoadDatabase { + + public AlterDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, + TaskTracker loadTaskTracker) { + super(context, event, dbNameToLoadIn, loadTaskTracker); + } + + @Override + public TaskTracker tasks() throws SemanticException { + tracker.addTask(alterDbTask(readDbMetadata(), context.hiveConf)); + return tracker; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java new file mode 100644 index 0000000000..87fe4db4de --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -0,0 +1,56 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.List; + +import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.stripQuotes; + +public class LoadFunction { + private static final Logger LOG = LoggerFactory.getLogger(LoadFunction.class); + private Context context; + private final FunctionEvent event; + private final String dbNameToLoadIn; + private final TaskTracker tracker; + + public LoadFunction(Context context, FunctionEvent event, String dbNameToLoadIn, + TaskTracker existingTracker) { + this.context = context; + this.event = event; + this.dbNameToLoadIn = dbNameToLoadIn; + this.tracker = new TaskTracker(existingTracker); + } + + public TaskTracker tasks() throws IOException, SemanticException { + URI fromURI = EximUtil + .getValidatedURI(context.hiveConf, stripQuotes(event.rootDir().toUri().toString())); + Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + + try { + CreateFunctionHandler handler = new CreateFunctionHandler(); + List> tasks = handler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, null, context.hiveConf, + context.hiveDb, null, LOG) + ); + tasks.forEach(tracker::addTask); + return tracker; + } catch (Exception e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java new file mode 100644 index 0000000000..550897a1bd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java @@ -0,0 +1,39 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import java.io.Serializable; + +public class ReplicationState implements Serializable { + + public static class PartitionState { + final String tableName; + public final String lastPartitionReplicated; + + public PartitionState(String tableName, String lastPartitionReplicated) { + this.tableName = tableName; + this.lastPartitionReplicated = lastPartitionReplicated; + } + } + + // null :: for non - partitioned table. + public final PartitionState partitionState; + // for non partitioned table this will represent the last tableName replicated, else its the name of the + // current partitioned table with last partition replicated denoted by "lastPartitionReplicated" + public final String lastTableReplicated; + // last function name is replicated, null if function replication was in progress when we created this state. + public final String functionName; + + public ReplicationState(PartitionState partitionState) { + this.partitionState = partitionState; + this.functionName = null; + this.lastTableReplicated = null; + } + + @Override + public String toString() { + return "ReplicationState{" + + ", partitionState=" + partitionState + + ", lastTableReplicated='" + lastTableReplicated + '\'' + + ", functionName='" + functionName + '\'' + + '}'; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java new file mode 100644 index 0000000000..af82a27801 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java @@ -0,0 +1,96 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * This class will be responsible to track how many tasks have been created, + * organization of tasks such that after the number of tasks for next execution are created + * we create a dependency collection task(DCT) -> another bootstrap task, + * and then add DCT as dependent to all existing tasks that are created so the cycle can continue. + */ +public class TaskTracker { + private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class); + /** + * used to identify the list of tasks at root level for a given level like table / db / partition. + * this does not include the task dependency notion of "table tasks < ---- partition task" + */ + private final List> tasks = new ArrayList<>(); + private ReplicationState replicationState = null; + // since tasks themselves can be graphs we want to limit the number of created + // tasks including all of dependencies. + private int numberOfTasks = 0; + private final int maxTasksAllowed; + + public TaskTracker(int defaultMaxTasks) { + maxTasksAllowed = defaultMaxTasks; + } + + public TaskTracker(TaskTracker existing) { + maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks; + } + + /** + * this method is used to identify all the tasks in a graph. + * the graph however might get created in a disjoint fashion, in which case we can just update + * the number of tasks using the "update" method. + */ + public void addTask(Task task) { + tasks.add(task); + updateTaskCount(task); + } + + private void updateTaskCount(Task task) { + numberOfTasks += 1; + if (task.getChildTasks() != null) { + for (Task childTask : task.getChildTasks()) { + updateTaskCount(childTask); + } + } + } + + public boolean canAddMoreTasks() { + return numberOfTasks < maxTasksAllowed; + } + + public boolean hasTasks() { + return numberOfTasks != 0; + } + + public void update(TaskTracker withAnother) { + numberOfTasks += withAnother.numberOfTasks; + if (withAnother.hasReplicationState()) { + this.replicationState = withAnother.replicationState; + } + } + + public void setReplicationState(ReplicationState state) { + this.replicationState = state; + } + + public boolean hasReplicationState() { + return replicationState != null; + } + + public ReplicationState replicationState() { + return replicationState; + } + + public List> tasks() { + return tasks; + } + + public void debugLog(String forEventType) { + LOG.debug("{} event with total / root number of tasks:{}/{}", forEventType, numberOfTasks, + tasks.size()); + } + + public int numberOfTasks() { + return numberOfTasks; + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java new file mode 100644 index 0000000000..1052dc6589 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -0,0 +1,259 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState; +import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; +import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.partSpecToString; + +public class LoadPartitions { + private static Logger LOG = LoggerFactory.getLogger(LoadPartitions.class); + + private final Context context; + private final TableContext tableContext; + private final TableEvent event; + private final TaskTracker tracker; + private final String lastPartitionReplicated; + + private final ImportTableDesc tableDesc; + private Table table; + + public LoadPartitions(Context context, TaskTracker tableTracker, TableEvent event, + String dbNameToLoadIn, TableContext tableContext) throws HiveException, IOException { + this(context, tableContext, tableTracker, event, dbNameToLoadIn, null); + } + + public LoadPartitions(Context context, TableContext tableContext, TaskTracker tableTracker, + TableEvent event, String dbNameToLoadIn, String lastPartitionReplicated) + throws HiveException, IOException { + this.tracker = new TaskTracker(tableTracker); + this.event = event; + this.context = context; + this.lastPartitionReplicated = lastPartitionReplicated; + this.tableContext = tableContext; + + this.tableDesc = tableContext.overrideProperties(event.tableDesc(dbNameToLoadIn)); + this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); + } + + private String location() throws MetaException, HiveException { + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + if (!tableContext.waitOnPrecursor()) { + return context.warehouse.getDefaultTablePath(parentDb, tableDesc.getTableName()).toString(); + } else { + Path tablePath = new Path( + context.warehouse.getDefaultDatabasePath(tableDesc.getDatabaseName()), + MetaStoreUtils.encodeTableName(tableDesc.getTableName().toLowerCase()) + ); + return context.warehouse.getDnsPath(tablePath).toString(); + } + } + + public TaskTracker tasks() throws SemanticException { + try { + /* + We are doing this both in load table and load partitions + */ + if (tableDesc.getLocation() == null) { + tableDesc.setLocation(location()); + } + + if (table == null) { + //new table + + table = new Table(tableDesc.getDatabaseName(), tableDesc.getTableName()); + if (isPartitioned(tableDesc)) { + updateReplicationState(initialReplicationState()); + return forNewTable(); + } + } else { + // existing + + if (table.isPartitioned()) { + List partitionDescs = event.partitionDescriptions(tableDesc); + if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) { + updateReplicationState(initialReplicationState()); + return forExistingTable(lastPartitionReplicated); + } + } + } + return tracker; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + private void updateReplicationState(ReplicationState replicationState) throws SemanticException { + if (!tracker.canAddMoreTasks()) { + tracker.setReplicationState(replicationState); + } + } + + private ReplicationState initialReplicationState() throws SemanticException { + return new ReplicationState(new PartitionState(tableDesc.getTableName(), lastPartitionReplicated)); + } + + private TaskTracker forNewTable() throws Exception { + Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); + while (iterator.hasNext() && tracker.canAddMoreTasks()) { + AddPartitionDesc addPartitionDesc = iterator.next(); + tracker.addTask(addSinglePartition(table, addPartitionDesc)); + ReplicationState currentReplicationState = + new ReplicationState(new PartitionState(table.getTableName(), + partSpecToString(addPartitionDesc.getPartition(0).getPartSpec())) + ); + updateReplicationState(currentReplicationState); + } + return tracker; + } + + /** + * returns the root task for adding a partition + */ + private Task addSinglePartition(Table table, + AddPartitionDesc addPartitionDesc) throws MetaException, IOException, HiveException { + AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); + Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); + Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); + partSpec.setLocation(replicaWarehousePartitionLocation.toString()); + LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " + + partSpecToString(partSpec.getPartSpec()) + " with source location: " + + partSpec.getLocation()); + Path tmpPath = context.utils.getExternalTmpPath(replicaWarehousePartitionLocation); + + Task copyTask = ReplCopyTask.getLoadCopyTask( + event.replicationSpec(), + sourceWarehousePartitionLocation, + tmpPath, + context.hiveConf + ); + + Task addPartTask = TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), + context.hiveConf + ); + + Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath); + + copyTask.addDependentTask(addPartTask); + addPartTask.addDependentTask(movePartitionTask); + return copyTask; + } + + /** + * This will create the move of partition data from temp path to actual path + */ + private Task movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, + Path tmpPath) { + LoadTableDesc loadTableWork = new LoadTableDesc( + tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), + event.replicationSpec().isReplace() + ); + loadTableWork.setInheritTableSpecs(false); + MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + return TaskFactory.get(work, context.hiveConf); + } + + private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartitionDesc partSpec) + throws MetaException, HiveException, IOException { + String child = Warehouse.makePartPath(partSpec.getPartSpec()); + if (tableDesc.getLocation() == null) { + if (table.getDataLocation() == null) { + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + return new Path( + context.warehouse.getDefaultTablePath(parentDb, tableDesc.getTableName()), child); + } else { + return new Path(table.getDataLocation().toString(), child); + } + } else { + return new Path(tableDesc.getLocation(), child); + } + } + + private Task alterSinglePartition(AddPartitionDesc desc, + ReplicationSpec replicationSpec, Partition ptn) { + desc.setReplaceMode(true); + if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { + desc.setReplicationSpec(replicationSpec); + } + desc.getPartition(0).setLocation(ptn.getLocation()); // use existing location + return TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), desc), + context.hiveConf + ); + } + + private TaskTracker forExistingTable(String lastPartitionReplicated) throws Exception { + boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null); + ReplicationSpec replicationSpec = event.replicationSpec(); + LOG.debug("table partitioned"); + for (AddPartitionDesc addPartitionDesc : event.partitionDescriptions(tableDesc)) { + if (encounteredTheLastReplicatedPartition && tracker.canAddMoreTasks()) { + Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Partition ptn; + + if ((ptn = context.hiveDb.getPartition(table, partSpec, false)) == null) { + if (!replicationSpec.isMetadataOnly()) { + forNewTable(); + } + } else { + // If replicating, then the partition already existing means we need to replace, maybe, if + // the destination ptn's repl.last.id is older than the replacement's. + if (replicationSpec.allowReplacementInto(ptn.getParameters())) { + if (replicationSpec.isMetadataOnly()) { + tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); + if (!tracker.canAddMoreTasks()) { + tracker.setReplicationState( + new ReplicationState(new PartitionState(table.getTableName(), + partSpecToString(partSpec)) + ) + ); + } + } else { + forNewTable(); + } + } else { + // ignore this ptn, do nothing, not an error. + } + } + } else { + String current = partSpecToString(addPartitionDesc.getPartition(0).getPartSpec()); + encounteredTheLastReplicatedPartition = lastPartitionReplicated.equals(current); + } + } + return tracker; + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java new file mode 100644 index 0000000000..f8af0cce63 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -0,0 +1,199 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.TreeMap; + +import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; + +public class LoadTable { + private final static Logger LOG = LoggerFactory.getLogger(LoadTable.class); + // private final Helper helper; + private final Context context; + private final TableContext tableContext; + private final TaskTracker tracker; + private final TableEvent event; + + public LoadTable(TableEvent event, Context context, TableContext tableContext) + throws SemanticException, IOException { + this.event = event; + this.context = context; + this.tableContext = tableContext; + this.tracker = new TaskTracker(tableContext.parentTracker); + } + + public TaskTracker tasks() throws SemanticException { + // Path being passed to us is a table dump location. We go ahead and load it in as needed. + // If tblName is null, then we default to the table name specified in _metadata, which is good. + // or are both specified, in which case, that's what we are intended to create the new table as. + try { + if (event.shouldNotReplicate()) { + return tracker; + } + String dbName = tableContext.dbNameToLoadIn; //this can never be null or empty; + // Create table associated with the import + // Executed if relevant, and used to contain all the other details about the table if not. + ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName)); + Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); + ReplicationSpec replicationSpec = event.replicationSpec(); + + // Normally, on import, trying to create a table or a partition in a db that does not yet exist + // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying + // to create tasks to create a table inside a db that as-of-now does not exist, but there is + // a precursor Task waiting that will create it before this is encountered. Thus, we instantiate + // defaults and do not error out in that case. + // the above will change now since we are going to split replication load in multiple execution + // tasks and hence we could have created the database earlier in which case the waitOnPrecursor will + // be false and hence if db Not found we should error out. + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + if (parentDb == null) { + if (!tableContext.waitOnPrecursor()) { + throw new SemanticException( + ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName())); + } + } + + if (table == null) { + // If table doesn't exist, allow creating a new one only if the database state is older than the update. + if ((parentDb != null) && (!replicationSpec + .allowReplacementInto(parentDb.getParameters()))) { + // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + return tracker; + } + } else { + if (!replicationSpec.allowReplacementInto(table.getParameters())) { + // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + return tracker; + } + } + + if (tableDesc.getLocation() == null) { + tableDesc.setLocation(location(tableDesc, parentDb)); + } + + + /* Note: In the following section, Metadata-only import handling logic is + interleaved with regular repl-import logic. The rule of thumb being + followed here is that MD-only imports are essentially ALTERs. They do + not load data, and should not be "creating" any metadata - they should + be replacing instead. The only place it makes sense for a MD-only import + to create is in the case of a table that's been dropped and recreated, + or in the case of an unpartitioned table. In all other cases, it should + behave like a noop or a pure MD alter. + */ + if (table == null) { + return newTableTasks(tableDesc); + } else { + return existingTableTasks(tableDesc, table, replicationSpec); + } + } catch (Exception e) { + throw new SemanticException(e); + } + } + + private TaskTracker existingTableTasks(ImportTableDesc tblDesc, Table table, + ReplicationSpec replicationSpec) { + if (!table.isPartitioned()) { + + LOG.debug("table non-partitioned"); + if (!replicationSpec.allowReplacementInto(table.getParameters())) { + return tracker; // silently return, table is newer than our replacement. + } + + Task alterTableTask = alterTableTask(tblDesc, replicationSpec); + if (replicationSpec.isMetadataOnly()) { + tracker.addTask(alterTableTask); + } else { + Task loadTableTask = + loadTableTask(table, replicationSpec, event.metadataPath(), event.metadataPath()); + alterTableTask.addDependentTask(loadTableTask); + tracker.addTask(alterTableTask); + } + } + return tracker; + } + + private TaskTracker newTableTasks(ImportTableDesc tblDesc) throws SemanticException { + Table table; + table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); + // Either we're dropping and re-creating, or the table didn't exist, and we're creating. + Task createTableTask = + tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); + if (event.replicationSpec().isMetadataOnly()) { + tracker.addTask(createTableTask); + return tracker; + } + if (!isPartitioned(tblDesc)) { + LOG.debug("adding dependent CopyWork/MoveWork for table"); + Task loadTableTask = + loadTableTask(table, event.replicationSpec(), new Path(tblDesc.getLocation()), + event.metadataPath()); + createTableTask.addDependentTask(loadTableTask); + } + tracker.addTask(createTableTask); + return tracker; + } + + private String location(ImportTableDesc tblDesc, Database parentDb) + throws MetaException, SemanticException { + if (!tableContext.waitOnPrecursor()) { + return context.warehouse.getDefaultTablePath(parentDb, tblDesc.getTableName()).toString(); + } else { + Path tablePath = new Path( + context.warehouse.getDefaultDatabasePath(tblDesc.getDatabaseName()), + MetaStoreUtils.encodeTableName(tblDesc.getTableName().toLowerCase()) + ); + return context.warehouse.getDnsPath(tablePath).toString(); + } + } + + private Task loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath, + Path fromURI) { + Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME); + Path tmpPath = context.utils.getExternalTmpPath(tgtPath); + Task copyTask = + ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); + + LoadTableDesc loadTableWork = new LoadTableDesc( + tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace() + ); + MoveWork moveWork = + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + Task loadTableTask = TaskFactory.get(moveWork, context.hiveConf); + copyTask.addDependentTask(loadTableTask); + return copyTask; + } + + private Task alterTableTask(ImportTableDesc tableDesc, + ReplicationSpec replicationSpec) { + tableDesc.setReplaceMode(true); + if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { + tableDesc.setReplicationSpec(replicationSpec); + } + return tableDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java new file mode 100644 index 0000000000..0fa9e33dde --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java @@ -0,0 +1,32 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; + +public class TableContext { + final String dbNameToLoadIn; + final TaskTracker parentTracker; + // this will only be available when we are doing table load only in replication not otherwise + private final String tableNameToLoadIn; + + public TableContext(TaskTracker parentTracker, String dbNameToLoadIn, + String tableNameToLoadIn) { + this.dbNameToLoadIn = dbNameToLoadIn; + this.parentTracker = parentTracker; + this.tableNameToLoadIn = tableNameToLoadIn; + } + + boolean waitOnPrecursor() { + return parentTracker.hasTasks(); + } + + ImportTableDesc overrideProperties(ImportTableDesc importTableDesc) + throws SemanticException { + if (StringUtils.isNotBlank(tableNameToLoadIn)) { + importTableDesc.setTableName(tableNameToLoadIn); + } + return importTableDesc; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java new file mode 100644 index 0000000000..7590be3505 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java @@ -0,0 +1,20 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.metadata.Hive; + +public class Context { + public final HiveConf hiveConf; + public final Hive hiveDb; + public final Warehouse warehouse; + public final PathUtils utils; + + public Context(HiveConf hiveConf, Hive hiveDb) throws MetaException { + this.hiveConf = hiveConf; + this.hiveDb = hiveDb; + this.warehouse = new Warehouse(hiveConf); + this.utils = new PathUtils(hiveConf); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java new file mode 100644 index 0000000000..72b47b9b9a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/PathUtils.java @@ -0,0 +1,88 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hive.ql.Context.EXT_PREFIX; +import static org.apache.hadoop.hive.ql.Context.generateExecutionId; + +public class PathUtils { + private static int pathId = 10000; + private static Logger LOG = LoggerFactory.getLogger(PathUtils.class); + + private final Map fsScratchDirs = new HashMap<>(); + private final String stagingDir; + private final HiveConf hiveConf; + + PathUtils(HiveConf hiveConf) { + this.hiveConf = hiveConf; + stagingDir = HiveConf.getVar(hiveConf, HiveConf.ConfVars.STAGINGDIR); + } + + public synchronized Path getExternalTmpPath(Path path) { + URI extURI = path.toUri(); + if (extURI.getScheme().equals("viewfs")) { + // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/.. + // to final /user/hive/warehouse/ will fail later, so instead pick tmp dir + // on same namespace as tbl dir. + return new Path(getStagingDir(path.getParent()), + EXT_PREFIX + Integer.toString(++pathId)); + } + Path fullyQualifiedPath = new Path(extURI.getScheme(), extURI.getAuthority(), extURI.getPath()); + return new Path(getStagingDir(fullyQualifiedPath), EXT_PREFIX + Integer.toString(++pathId)); + } + + private Path getStagingDir(Path inputPath) { + final URI inputPathUri = inputPath.toUri(); + final String inputPathName = inputPathUri.getPath(); + final String fileSystemAsString = inputPathUri.getScheme() + ":" + inputPathUri.getAuthority(); + + String stagingPathName; + if (!inputPathName.contains(stagingDir)) { + stagingPathName = new Path(inputPathName, stagingDir).toString(); + } else { + stagingPathName = + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length()); + } + + final String key = + fileSystemAsString + "-" + stagingPathName + "-" + TaskRunner.getTaskRunnerID(); + + Path dir = fsScratchDirs.get(key); + try { + FileSystem fileSystem = inputPath.getFileSystem(hiveConf); + if (dir == null) { + // Append task specific info to stagingPathName, instead of creating a sub-directory. + // This way we don't have to worry about deleting the stagingPathName separately at + // end of query execution. + Path path = new Path( + stagingPathName + "_" + generateExecutionId() + "-" + TaskRunner.getTaskRunnerID()); + dir = fileSystem.makeQualified(path); + + LOG.debug("Created staging dir = " + dir + " for path = " + inputPath); + + if (!FileUtils.mkdir(fileSystem, dir, hiveConf)) { + throw new IllegalStateException( + "Cannot create staging directory '" + dir.toString() + "'"); + } + fileSystem.deleteOnExit(dir); + } + fsScratchDirs.put(key, dir); + return dir; + + } catch (IOException e) { + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString() + "': " + e.getMessage(), e); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index f3f206b6e2..37edd5c787 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapred.OutputFormat; +import org.slf4j.Logger; /** * ImportSemanticAnalyzer. @@ -353,7 +354,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } private static Task createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){ - return tableDesc.getCreateTableTask(x); + return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } private static Task dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){ @@ -370,7 +371,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){ tableDesc.setReplicationSpec(replicationSpec); } - return tableDesc.getCreateTableTask(x); + return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), x.getConf()); } private static Task alterSinglePartition( @@ -452,29 +453,29 @@ private static void fixLocationInPartSpec( Warehouse.makePartPath(partSpec.getPartSpec())); } FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); - checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x); + checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); partSpec.setLocation(tgtPath.toString()); } - private static void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec, - EximUtil.SemanticAnalyzerWrapperContext x) + public static void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec, + Logger logger) throws IOException, SemanticException { if (replicationSpec.isInReplicationScope()){ // replication scope allows replacement, and does not require empty directories return; } - x.getLOG().debug("checking emptiness of " + targetPath.toString()); + logger.debug("checking emptiness of " + targetPath.toString()); if (fs.exists(targetPath)) { FileStatus[] status = fs.listStatus(targetPath, FileUtils.HIDDEN_FILES_PATH_FILTER); if (status.length > 0) { - x.getLOG().debug("Files inc. " + status[0].getPath().toString() + logger.debug("Files inc. " + status[0].getPath().toString() + " found in path : " + targetPath.toString()); throw new SemanticException(ErrorMsg.TABLE_DATA_EXISTS.getMsg()); } } } - private static String partSpecToString(Map partSpec) { + public static String partSpecToString(Map partSpec) { StringBuilder sb = new StringBuilder(); boolean firstTime = true; for (Map.Entry entry : partSpec.entrySet()) { @@ -489,7 +490,8 @@ private static String partSpecToString(Map partSpec) { return sb.toString(); } - private static void checkTable(Table table, ImportTableDesc tableDesc, ReplicationSpec replicationSpec, HiveConf conf) + public static void checkTable(Table table, ImportTableDesc tableDesc, + ReplicationSpec replicationSpec, HiveConf conf) throws SemanticException, URISyntaxException { // This method gets called only in the scope that a destination table already exists, so // we're validating if the table is an appropriate destination to import into @@ -739,7 +741,7 @@ private static void createRegularImportTasks( // ensure if destination is not empty only for regular import Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); - checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x); + checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); loadTable(fromURI, table, false, tgtPath, replicationSpec,x); } // Set this to read because we can't overwrite any existing partitions @@ -774,7 +776,7 @@ private static void createRegularImportTasks( tablePath = wh.getDefaultTablePath(parentDb, tblDesc.getTableName()); } FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); - checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x); + checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG()); t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x)); } } @@ -935,7 +937,7 @@ private static void createReplImportTasks( } - private static boolean isPartitioned(ImportTableDesc tblDesc) { + public static boolean isPartitioned(ImportTableDesc tblDesc) { return !(tblDesc.getPartCols() == null || tblDesc.getPartCols().isEmpty()); } @@ -943,7 +945,7 @@ private static boolean isPartitioned(ImportTableDesc tblDesc) { * Utility method that returns a table if one corresponding to the destination * tblDesc is found. Returns null if no such table is found. */ - private static Table tableIfExists(ImportTableDesc tblDesc, Hive db) throws HiveException { + public static Table tableIfExists(ImportTableDesc tblDesc, Hive db) throws HiveException { try { return db.getTable(tblDesc.getDatabaseName(),tblDesc.getTableName()); } catch (InvalidTableException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 48d9c945c9..376787767b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,32 +17,26 @@ Licensed to the Apache Software Foundation (ASF) under one */ package org.apache.hadoop.hive.ql.parse; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; -import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; -import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; -import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -50,14 +44,10 @@ Licensed to the Apache Software Foundation (ASF) under one import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; -import java.io.IOException; import java.io.Serializable; -import java.net.URI; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -82,7 +72,7 @@ Licensed to the Apache Software Foundation (ASF) under one private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; - private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { @@ -280,8 +270,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { - // not an event dump, and table name pattern specified, this has to be a tbl-level dump - rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null, null, null)); + ReplLoadWork replLoadWork = + new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern); + rootTasks.add(TaskFactory.get(replLoadWork, conf)); return; } @@ -310,9 +301,12 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { + " does not correspond to REPL LOAD expecting to load to a singular destination point."); } - for (FileStatus dir : dirsInLoadPath) { - analyzeDatabaseLoad(dbNameOrPattern, fs, dir); - } + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern); + rootTasks.add(TaskFactory.get(replLoadWork, conf)); + // + // for (FileStatus dir : dirsInLoadPath) { + // analyzeDatabaseLoad(dbNameOrPattern, fs, dir); + // } } else { // Event dump, each sub-dir is an individual event dump. // We need to guarantee that the directory listing we got is in order of evid. @@ -425,7 +419,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { for (String tableName : tablesUpdated.keySet()){ // weird - AlterTableDesc requires a HashMap to update props instead of a Map. - HashMap mapProp = new HashMap(); + HashMap mapProp = new HashMap<>(); String eventId = tablesUpdated.get(tableName).toString(); mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); @@ -439,7 +433,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { taskChainTail = updateReplIdTask; } for (String dbName : dbsUpdated.keySet()){ - Map mapProp = new HashMap(); + Map mapProp = new HashMap<>(); String eventId = dbsUpdated.get(dbName).toString(); mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), eventId); @@ -486,187 +480,6 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { return tasks; } - private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException { - Hive hiveDb = Hive.get(); - Database db = hiveDb.getDatabase(dbName); - if (null != db) { - List allTables = hiveDb.getAllTables(dbName); - List allFunctions = hiveDb.getFunctions(dbName, "*"); - if (!allTables.isEmpty()) { - throw new InvalidOperationException( - "Database " + db.getName() + " is not empty. One or more tables exist."); - } - if (!allFunctions.isEmpty()) { - throw new InvalidOperationException( - "Database " + db.getName() + " is not empty. One or more functions exist."); - } - - return true; - } - - return false; - } - - private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) - throws SemanticException { - try { - // Path being passed to us is a db dump location. We go ahead and load as needed. - // dbName might be null or empty, in which case we keep the original db name for the new - // database creation - - // Two steps here - first, we read the _metadata file here, and create a CreateDatabaseDesc - // associated with that - // Then, we iterate over all subdirs, and create table imports for each. - - MetaData rv = new MetaData(); - try { - rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME)); - } catch (IOException e) { - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); - } - - Database dbObj = rv.getDatabase(); - - if (dbObj == null) { - throw new IllegalArgumentException( - "_metadata file read did not contain a db object - invalid dump."); - } - - if ((dbName == null) || (dbName.isEmpty())) { - // We use dbName specified as long as it is not null/empty. If so, then we use the original - // name - // recorded in the thrift object. - dbName = dbObj.getName(); - } - - REPL_STATE_LOG.info("Repl Load: Started analyzing Repl Load for DB: {} from Dump Dir: {}, Dump Type: BOOTSTRAP", - dbName, dir.getPath().toUri().toString()); - - Task dbRootTask = null; - if (existEmptyDb(dbName)) { - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, dbObj.getParameters(), null); - dbRootTask = TaskFactory.get(new DDLWork(inputs, outputs, alterDbDesc), conf); - } else { - CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); - createDbDesc.setName(dbName); - createDbDesc.setComment(dbObj.getDescription()); - createDbDesc.setDatabaseProperties(dbObj.getParameters()); - // note that we do not set location - for repl load, we want that auto-created. - - createDbDesc.setIfNotExists(false); - // If it exists, we want this to be an error condition. Repl Load is not intended to replace a - // db. - // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on. - dbRootTask = TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf); - } - - rootTasks.add(dbRootTask); - FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs)); - - for (FileStatus tableDir : Collections2.filter(Arrays.asList(dirsInDbPath), new TableDirPredicate())) { - analyzeTableLoad( - dbName, null, tableDir.getPath().toUri().toString(), dbRootTask, null, null); - REPL_STATE_LOG.info("Repl Load: Analyzed table/view/partition load from path {}", - tableDir.getPath().toUri().toString()); - } - - //Function load - Path functionMetaDataRoot = new Path(dir.getPath(), FUNCTIONS_ROOT_DIR_NAME); - if (fs.exists(functionMetaDataRoot)) { - List functionDirectories = - Arrays.asList(fs.listStatus(functionMetaDataRoot, EximUtil.getDirectoryFilter(fs))); - for (FileStatus functionDir : functionDirectories) { - analyzeFunctionLoad(dbName, functionDir, dbRootTask); - REPL_STATE_LOG.info("Repl Load: Analyzed function load from path {}", - functionDir.getPath().toUri().toString()); - } - } - - REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl Load for DB: {} and created import (DDL/COPY/MOVE) tasks", - dbName); - } catch (Exception e) { - throw new SemanticException(e); - } - } - - private static class TableDirPredicate implements Predicate { - @Override - public boolean apply(FileStatus fileStatus) { - return !fileStatus.getPath().getName().contains(FUNCTIONS_ROOT_DIR_NAME); - } - } - - private void analyzeFunctionLoad(String dbName, FileStatus functionDir, - Task createDbTask) throws IOException, SemanticException { - URI fromURI = EximUtil - .getValidatedURI(conf, stripQuotes(functionDir.getPath().toUri().toString())); - Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); - - try { - CreateFunctionHandler handler = new CreateFunctionHandler(); - List> tasksList = handler.handle( - new MessageHandler.Context( - dbName, null, fromPath.toString(), createDbTask, null, conf, db, - null, LOG) - ); - - tasksList.forEach(task -> { - createDbTask.addDependentTask(task); - LOG.debug("Added {}:{} as a precursor of {}:{}", - createDbTask.getClass(), createDbTask.getId(), task.getClass(), - task.getId()); - - }); - inputs.addAll(handler.readEntities()); - } catch (Exception e) { - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); - } - } - - private List> analyzeTableLoad( - String dbName, String tblName, String locn, - Task precursor, - Map dbsUpdated, Map tablesUpdated) throws SemanticException { - // Path being passed to us is a table dump location. We go ahead and load it in as needed. - // If tblName is null, then we default to the table name specified in _metadata, which is good. - // or are both specified, in which case, that's what we are intended to create the new table as. - if (dbName == null || dbName.isEmpty()) { - throw new SemanticException("Database name cannot be null for a table load"); - } - try { - // no location set on repl loads - boolean isLocationSet = false; - // all repl imports are non-external - boolean isExternalSet = false; - // bootstrap loads are not partition level - boolean isPartSpecSet = false; - // repl loads are not partition level - LinkedHashMap parsedPartSpec = null; - // no location for repl imports - String parsedLocation = null; - List> importTasks = new ArrayList>(); - - EximUtil.SemanticAnalyzerWrapperContext x = - new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG, - ctx); - ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, - (precursor != null), parsedLocation, tblName, dbName, parsedPartSpec, locn, x, - dbsUpdated, tablesUpdated); - - if (precursor != null) { - for (Task t : importTasks) { - precursor.addDependentTask(t); - LOG.debug("Added {}:{} as a precursor of {}:{}", - precursor.getClass(), precursor.getId(), t.getClass(), t.getId()); - } - } - - return importTasks; - } catch (Exception e) { - throw new SemanticException(e); - } - } - // REPL STATUS private void initReplStatus(ASTNode ast) { int numChildren = ast.getChildCount(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java index bb02c26619..e574a479b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java @@ -18,14 +18,19 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; +import java.util.HashSet; import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; @@ -304,12 +309,13 @@ public String getDatabaseName() { return dbName; } - public Task getCreateTableTask(EximUtil.SemanticAnalyzerWrapperContext x) { + public Task getCreateTableTask(HashSet inputs, HashSet outputs, + HiveConf conf) { switch (getTableType()) { case TABLE: - return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), createTblDesc), x.getConf()); + return TaskFactory.get(new DDLWork(inputs, outputs, createTblDesc), conf); case VIEW: - return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), createViewDesc), x.getConf()); + return TaskFactory.get(new DDLWork(inputs, outputs, createViewDesc), conf); } return null; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTrackerTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTrackerTest.java new file mode 100644 index 0000000000..aa29a5a293 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTrackerTest.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.Serializable; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(PowerMockRunner.class) + public class TaskTrackerTest { + @Mock + private Task task; + + @Test + public void taskTrackerCompositionInitializesTheMaxTasksCorrectly() { + TaskTracker taskTracker = new TaskTracker(1); + assertTrue(taskTracker.canAddMoreTasks()); + taskTracker.addTask(task); + assertFalse(taskTracker.canAddMoreTasks()); + + TaskTracker taskTracker2 = new TaskTracker(taskTracker); + assertFalse(taskTracker2.canAddMoreTasks()); + } +} \ No newline at end of file