diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 28058b3374..b81440acb3 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -524,6 +524,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "task increment that would cross the specified limit."), REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",100, "Number of threads that will be used to dump partition data information during repl dump."), + REPL_DATA_COPY_LAZY("hive.repl.data.copy.lazy", false, + "Indicates whether replication should run data copy tasks during repl load operation."), + REPL_FILE_LIST_CACHE_SIZE("hive.repl.file.list.cache.size", 10000, + "This config indicates threshold for the maximum number of data copy locations to be kept in memory. \n" + + "When the config 'hive.repl.data.copy.lazy' is set to true, this config is not considered."), REPL_DUMPDIR_CLEAN_FREQ("hive.repl.dumpdir.clean.freq", "0s", new TimeValidator(TimeUnit.SECONDS), "Frequency at which timer task runs to purge expired dump dirs."), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 169fed857d..66b0d078c2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -256,15 +257,32 @@ private Tuple bootstrapLoadAndVerify(String dbName, String replDbName) throws IO return incrementalLoadAndVerify(dbName, replDbName); } + private Tuple bootstrapLoadAndVerify(String dbName, String replDbName, List withClauseOptions) + throws IOException { + return incrementalLoadAndVerify(dbName, replDbName, withClauseOptions); + } + private Tuple incrementalLoadAndVerify(String dbName, String replDbName) throws IOException { Tuple dump = replDumpDb(dbName); loadAndVerify(replDbName, dbName, dump.lastReplId); return dump; } + private Tuple incrementalLoadAndVerify(String dbName, String replDbName, List withClauseOptions) + throws IOException { + Tuple dump = replDumpDb(dbName, withClauseOptions); + loadAndVerify(replDbName, dbName, dump.lastReplId, withClauseOptions); + return dump; + } + private Tuple replDumpDb(String dbName) throws IOException { + return replDumpDb(dbName, null); + } + + private Tuple replDumpDb(String dbName, List withClauseOptions) throws IOException { + String withClause = getWithClause(withClauseOptions); advanceDumpDir(); - String dumpCmd = "REPL DUMP " + dbName; + String dumpCmd = "REPL DUMP " + dbName + withClause; run(dumpCmd, driver); String dumpLocation = getResult(0, 0, driver); String lastReplId = getResult(0, 1, true, driver); @@ -272,8 +290,21 @@ private Tuple replDumpDb(String dbName) throws IOException { return new Tuple(dumpLocation, lastReplId); } + private String getWithClause(List withClauseOptions) { + if (withClauseOptions != null && !withClauseOptions.isEmpty()) { + return " with (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + return ""; + } + private void loadAndVerify(String replDbName, String sourceDbNameOrPattern, String lastReplId) throws IOException { - run("REPL LOAD " + sourceDbNameOrPattern + " INTO " + replDbName, driverMirror); + loadAndVerify(replDbName, sourceDbNameOrPattern, lastReplId, null); + } + + private void loadAndVerify(String replDbName, String sourceDbNameOrPattern, String lastReplId, + List withClauseOptions) throws IOException { + String withClause = getWithClause(withClauseOptions); + run("REPL LOAD " + sourceDbNameOrPattern + " INTO " + replDbName + withClause, driverMirror); verifyRun("REPL STATUS " + replDbName, lastReplId, driverMirror); return; } @@ -546,6 +577,61 @@ public void testBasicWithCM() throws Exception { verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); } + @Test + public void testBasicWithCMLazyCopy() throws Exception { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + String[] unptn_data = new String[]{ "eleven" , "twelve" }; + String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; + String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; + String[] empty = new String[]{}; + + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); + + createTestDataFile(unptn_locn, unptn_data); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driver); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); + + String lazyCopyClause = " with ('" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true')"; + + advanceDumpDir(); + run("REPL DUMP " + dbName + lazyCopyClause, driver); + String replDumpLocn = getResult(0,0, driver); + String replDumpId = getResult(0,1,true, driver); + + // Table dropped after "repl dump" + run("DROP TABLE " + dbName + ".unptned", driver); + + // Partition droppped after "repl dump" + run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver); + + run("REPL LOAD " + dbName + " INTO " + replDbName + lazyCopyClause, driverMirror); + verifyRun("REPL STATUS " + replDbName, new String[] {replDumpId}, driverMirror); + + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_empty", empty, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); + } + @Test public void testBootstrapLoadOnExistingDb() throws IOException { String testName = "bootstrapLoadOnExistingDb"; @@ -1559,6 +1645,76 @@ public void testIncrementalLoad() throws IOException { verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); } + @Test + public void testIncrementalLoadLazyCopy() throws IOException { + String testName = "testIncrementalLoadLazyCopy"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + List lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'"); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName, lazyCopyClause); + + String[] unptnData = new String[] {"eleven", "twelve"}; + String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"}; + String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"}; + String[] empty = new String[] {}; + + String unptnLocn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath(); + + createTestDataFile(unptnLocn, unptnData); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); + + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror); + + run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver); + + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName, lazyCopyClause); + + Path hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), hconf); + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); + + run("CREATE TABLE " + dbName + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + + ".ptned WHERE b=1", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver); + + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + + ".ptned WHERE b=2", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); + + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); + hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); + } + @Test public void testIncrementalInserts() throws IOException { String testName = "incrementalInserts"; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 493a467ac6..04ead62aab 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -210,6 +210,66 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true); } + @Test + public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable { + List lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'"); + //creates external tables with partitions + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, lazyCopyClause); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false); + + + + replica.load(replicatedDbName, primaryDbName, lazyCopyClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 where country = 'us'") + .verifyResult("us") + .run("select country from t2 where country = 'france'") + .verifyResult("france") + .run("show partitions t2").verifyResults(new String[] {"country=france", "country=india", "country=us"}); + + String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + // Ckpt should be set on bootstrapped db. + replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation); + + assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1"); + assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("create external table t4 as select id from t3") + .dump(primaryDbName, lazyCopyClause); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true); + + replica.load(replicatedDbName, primaryDbName, lazyCopyClause) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyResult("t3") + .run("select id from t3") + .verifyResult("10") + .run("select id from t4") + .verifyResult("10"); + } + /** * @param sourceTableName -- Provide the fully qualified table name * @param replicaTableName -- Provide the fully qualified table name diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index d3e94134ab..b81d96141a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -50,7 +50,7 @@ public int execute() { work.acidPostProcess(db); TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf, work.getMmContext()); - tableExport.write(true); + tableExport.write(true, null, false); } catch (Exception e) { LOG.error("failed", e); setException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java index 46f9bb3add..04bbd564be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.repl.util.StringConvertibleObject; import org.apache.hadoop.hive.ql.plan.Explain; import java.io.Serializable; @@ -27,10 +28,14 @@ @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) -public class DirCopyWork implements Serializable { +public class DirCopyWork implements Serializable, StringConvertibleObject { + private static final String URI_SEPARATOR = "#"; private static final long serialVersionUID = 1L; - private final Path fullyQualifiedSourcePath; - private final Path fullyQualifiedTargetPath; + private Path fullyQualifiedSourcePath; + private Path fullyQualifiedTargetPath; + + public DirCopyWork() { + } public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) { this.fullyQualifiedSourcePath = fullyQualifiedSourcePath; @@ -51,4 +56,20 @@ public Path getFullyQualifiedSourcePath() { public Path getFullyQualifiedTargetPath() { return fullyQualifiedTargetPath; } + + @Override + public String convertToString() { + StringBuilder objInStr = new StringBuilder(); + objInStr.append(fullyQualifiedSourcePath) + .append(URI_SEPARATOR) + .append(fullyQualifiedTargetPath); + return objInStr.toString(); + } + + @Override + public void loadFromString(String objectInStr) { + String paths[] = objectInStr.split(URI_SEPARATOR); + this.fullyQualifiedSourcePath = new Path(paths[0]); + this.fullyQualifiedTargetPath = new Path(paths[1]); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index b15b326b1c..6674cbf9ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -41,11 +41,11 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.metastore.utils.Retry; -import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; @@ -58,7 +58,6 @@ import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -80,6 +79,7 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.io.IOUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +92,6 @@ import java.io.InputStreamReader; import java.io.Serializable; import java.io.UnsupportedEncodingException; -import java.net.URI; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set; @@ -465,9 +464,9 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive String validTxnList = null; long waitUntilTime = 0; long bootDumpBeginReplId = -1; - List managedTableCopyPaths = Collections.emptyList(); - List extTableCopyWorks = Collections.emptyList(); + List tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); + // If we are bootstrapping ACID tables, we need to perform steps similar to a regular // bootstrap (See bootstrapDump() for more details. Only difference here is instead of // waiting for the concurrent transactions to finish, we start dumping the incremental events @@ -540,63 +539,75 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive dmd.setReplScope(work.replScope); } dmd.write(true); - // Examine all the tables if required. - if (shouldExamineTablesToDump() || (tableList != null)) { - // If required wait more for any transactions open at the time of starting the ACID bootstrap. - if (needBootstrapAcidTablesDuringIncrementalDump()) { - assert (waitUntilTime > 0); - validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); - } + int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); + try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); + FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { + // Examine all the tables if required. + if (shouldExamineTablesToDump() || (tableList != null)) { + // If required wait more for any transactions open at the time of starting the ACID bootstrap. + if (needBootstrapAcidTablesDuringIncrementalDump()) { + assert (waitUntilTime > 0); + validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); + } /* When same dump dir is resumed because of check-pointing, we need to clear the existing metadata. We need to rewrite the metadata as the write id list will be changed. We can't reuse the previous write id as it might be invalid due to compaction. */ - Path bootstrapRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); - Path metadataPath = new Path(bootstrapRoot, EximUtil.METADATA_PATH_NAME); - FileSystem fs = FileSystem.get(metadataPath.toUri(), conf); - try { - fs.delete(metadataPath, true); - } catch (FileNotFoundException e) { - // no worries - } - Path dbRootMetadata = new Path(metadataPath, dbName); - Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName); - managedTableCopyPaths = new ArrayList<>(); - List extTableLocations = new LinkedList<>(); - try (Writer writer = new Writer(dumpRoot, conf)) { - for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { - try { - Table table = hiveDb.getTable(dbName, tableName); - - // Dump external table locations if required. - if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) - && shouldDumpExternalTableLocation()) { - extTableLocations.addAll(writer.dataLocationDump(table)); - } - - // Dump the table to be bootstrapped if required. - if (shouldBootstrapDumpTable(table)) { - HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - managedTableCopyPaths.addAll( - dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, - hiveDb, tableTuple)); - } - if (tableList != null && isTableSatifiesConfig(table)) { - tableList.add(tableName); + Path bootstrapRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + Path metadataPath = new Path(bootstrapRoot, EximUtil.METADATA_PATH_NAME); + FileSystem fs = FileSystem.get(metadataPath.toUri(), conf); + try { + fs.delete(metadataPath, true); + } catch (FileNotFoundException e) { + // no worries + } + Path dbRootMetadata = new Path(metadataPath, dbName); + Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName); + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + try (Writer writer = new Writer(dumpRoot, conf)) { + for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { + try { + Table table = hiveDb.getTable(dbName, tableName); + + // Dump external table locations if required. + if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) + && shouldDumpExternalTableLocation()) { + writer.dataLocationDump(table, extTableFileList, conf); + } + + // Dump the table to be bootstrapped if required. + if (shouldBootstrapDumpTable(table)) { + HiveWrapper.Tuple
tableTuple = new HiveWrapper(hiveDb, dbName).table(table); + dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, + hiveDb, tableTuple, managedTblList, dataCopyAtLoad); + } + if (tableList != null && isTableSatifiesConfig(table)) { + tableList.add(tableName); + } + } catch (InvalidTableException te) { + // Repl dump shouldn't fail if the table is dropped/renamed while dumping it. + // Just log a debug message and skip it. + LOG.debug(te.getMessage()); } - } catch (InvalidTableException te) { - // Repl dump shouldn't fail if the table is dropped/renamed while dumping it. - // Just log a debug message and skip it. - LOG.debug(te.getMessage()); } } + dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); } - dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); - extTableCopyWorks = dirLocationsToCopy(extTableLocations); + setDataCopyIterators(extTableFileList, managedTblList); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId); + return lastReplId; + } + } + + private void setDataCopyIterators(FileList extTableFileList, FileList managedTableFileList) { + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + if (dataCopyAtLoad) { + work.setManagedTableCopyPathIterator(Collections.emptyList().iterator()); + work.setExternalTblCopyPathIterator(Collections.emptyList().iterator()); + LOG.info("Deferring table/partition data copy during dump. It should be done at load."); + } else { + work.setManagedTableCopyPathIterator(managedTableFileList); + work.setExternalTblCopyPathIterator(extTableFileList); } - work.setDirCopyIterator(extTableCopyWorks.iterator()); - work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); - work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId); - return lastReplId; } private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot) { @@ -750,23 +761,6 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); } - private List dirLocationsToCopy(List sourceLocations) - throws HiveException { - if (sourceLocations.isEmpty()) { - return Collections.emptyList(); - } - List list = new ArrayList<>(sourceLocations.size()); - String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); - // this is done to remove any scheme related information that will be present in the base path - // specifically when we are replicating to cloud storage - Path basePath = ReplExternalTables.getExternalTableBaseDir(conf); - for (Path sourcePath : sourceLocations) { - Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath); - list.add(new DirCopyWork(sourcePath, targetPath)); - } - return list; - } - Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { // bootstrap case @@ -778,8 +772,6 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) List tableList; LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); - List extTableCopyWorks = new ArrayList<>(); - List managedTableCopyPaths = new ArrayList<>(); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); long waitUntilTime = System.currentTimeMillis() + timeoutInMs; @@ -790,95 +782,103 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) //We can't reuse the previous write id as it might be invalid due to compaction metadataPath.getFileSystem(conf).delete(metadataPath, true); } - for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { - LOG.debug("Dumping db: " + dbName); - // TODO : Currently we don't support separate table list for each database. - tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); - Database db = hiveDb.getDatabase(dbName); - if ((db != null) && (ReplUtils.isFirstIncPending(db.getParameters()))) { - // For replicated (target) database, until after first successful incremental load, the database will not be - // in a consistent state. Avoid allowing replicating this database to a new target. - throw new HiveException("Replication dump not allowed for replicated database" + - " with first incremental dump pending : " + dbName); - } - int estimatedNumTables = Utils.getAllTables(hiveDb, dbName, work.replScope).size(); - int estimatedNumFunctions = hiveDb.getAllFunctions().size(); - replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), - estimatedNumTables, - estimatedNumFunctions); - replLogger.startLog(); - Map metricMap = new HashMap<>(); - metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) estimatedNumTables); - metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) estimatedNumFunctions); - work.getMetricCollector().reportStageStart(getName(), metricMap); - Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb); - Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName); - dumpFunctionMetadata(dbName, dbRoot, hiveDb); - - String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); - Exception caught = null; - try (Writer writer = new Writer(dbRoot, conf)) { - List extTableLocations = new LinkedList<>(); - for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { - LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); - Table table = null; - - try { - HiveWrapper.Tuple
tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf); - table = tableTuple != null ? tableTuple.object : null; - if (shouldDumpExternalTableLocation() - && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { - LOG.debug("Adding table {} to external tables list", tblName); - extTableLocations.addAll(writer.dataLocationDump(tableTuple.object)); + int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); + try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); + FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { + for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { + LOG.debug("Dumping db: " + dbName); + // TODO : Currently we don't support separate table list for each database. + tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); + Database db = hiveDb.getDatabase(dbName); + if ((db != null) && (ReplUtils.isFirstIncPending(db.getParameters()))) { + // For replicated (target) database, until after first successful incremental load, the database will not be + // in a consistent state. Avoid allowing replicating this database to a new target. + throw new HiveException("Replication dump not allowed for replicated database" + + " with first incremental dump pending : " + dbName); + } + int estimatedNumTables = Utils.getAllTables(hiveDb, dbName, work.replScope).size(); + int estimatedNumFunctions = hiveDb.getAllFunctions().size(); + replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), + estimatedNumTables, + estimatedNumFunctions); + replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) estimatedNumTables); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) estimatedNumFunctions); + work.getMetricCollector().reportStageStart(getName(), metricMap); + Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb); + Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName); + dumpFunctionMetadata(dbName, dbRoot, hiveDb); + + String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); + Exception caught = null; + try (Writer writer = new Writer(dbRoot, conf)) { + List extTableLocations = new LinkedList<>(); + for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { + LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); + Table table = null; + + try { + HiveWrapper.Tuple
tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf); + table = tableTuple != null ? tableTuple.object : null; + if (shouldDumpExternalTableLocation() + && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { + LOG.debug("Adding table {} to external tables list", tblName); + writer.dataLocationDump(tableTuple.object, extTableFileList, conf); + } + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot, + bootDumpBeginReplId, + hiveDb, tableTuple, managedTblList, dataCopyAtLoad); + } catch (InvalidTableException te) { + // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. + // Just log a debug message and skip it. + LOG.debug(te.getMessage()); + } + dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb); + if (tableList != null && isTableSatifiesConfig(table)) { + tableList.add(tblName); } - managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot, - bootDumpBeginReplId, - hiveDb, tableTuple)); - } catch (InvalidTableException te) { - // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. - // Just log a debug message and skip it. - LOG.debug(te.getMessage()); - } - dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb); - if (tableList != null && isTableSatifiesConfig(table)) { - tableList.add(tblName); } - } - dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); - extTableCopyWorks = dirLocationsToCopy(extTableLocations); - } catch (Exception e) { - caught = e; - } finally { - try { - Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); + dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); } catch (Exception e) { - if (caught == null) { - throw e; - } else { - LOG.error("failed to reset the db state for " + uniqueKey - + " on failure of repl dump", e); + caught = e; + } finally { + try { + Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); + } catch (Exception e) { + if (caught == null) { + throw e; + } else { + LOG.error("failed to reset the db state for " + uniqueKey + + " on failure of repl dump", e); + throw caught; + } + } + if (caught != null) { throw caught; } } - if (caught != null) { - throw caught; - } + replLogger.endLog(bootDumpBeginReplId.toString()); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId); } - replLogger.endLog(bootDumpBeginReplId.toString()); - work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId); + Long bootDumpEndReplId = currentNotificationId(hiveDb); + LOG.info("Preparing to return {},{}->{}", + dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); + long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); + dmd.write(true); + setDataCopyIterators(extTableFileList, managedTblList); + return bootDumpBeginReplId; } - Long bootDumpEndReplId = currentNotificationId(hiveDb); - LOG.info("Preparing to return {},{}->{}", - dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); - long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); - dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); - dmd.write(true); + } - work.setDirCopyIterator(extTableCopyWorks.iterator()); - work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); - return bootDumpBeginReplId; + private FileList createTableFileList(Path dumpRoot, String fileName, int cacheSize) throws IOException { + Path backingFile = new Path(dumpRoot, fileName); + return work.getFileList(backingFile, cacheSize, conf, true); } + private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) { try { return dumpMetaData.getEventFrom() != null; @@ -921,9 +921,10 @@ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hive return dbRoot; } - List dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, Path dbRootData, long lastReplId, Hive hiveDb, - HiveWrapper.Tuple
tuple) throws Exception { + HiveWrapper.Tuple
tuple, FileList managedTbleList, boolean dataCopyAtLoad) + throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = @@ -941,16 +942,18 @@ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hive } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); tuple.replicationSpec.setRepl(true); - List managedTableCopyPaths = new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, - conf, mmCtx).write(false); + new TableExport(exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write( + false, managedTbleList, dataCopyAtLoad); work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.TABLES.name(), 1); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); + } + + private boolean dataCopyRequired(TableSpec tableSpec) { if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE) || Utils.shouldDumpMetaDataOnly(conf)) { - return Collections.emptyList(); + return false; } - return managedTableCopyPaths; + return true; } private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index bccaf9417b..e0b61a5d6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -24,14 +24,17 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -53,12 +56,14 @@ private static boolean testInjectDumpDirAutoIncrement = false; static boolean testDeletePreviousDumpMetaPath = false; private Integer maxEventLimit; - private transient Iterator dirCopyIterator; - private transient Iterator managedTableCopyPathIterator; + private transient Iterator externalTblCopyPathIterator; + private transient Iterator managedTblCopyPathIterator; private Path currentDumpPath; private List resultValues; private boolean shouldOverwrite; private transient ReplicationMetricCollector metricCollector; + private ReplicationSpec replicationSpec; + private FileList mockedFileList; public static void injectNextDumpDirForTest(String dumpDir) { injectNextDumpDirForTest(dumpDir, false); @@ -130,22 +135,22 @@ void overrideLastEventToDump(Hive fromDb, long bootstrapLastId) throws Exception } } - public void setDirCopyIterator(Iterator dirCopyIterator) { - if (this.dirCopyIterator != null) { - throw new IllegalStateException("Dir Copy iterator has already been initialized"); + public void setExternalTblCopyPathIterator(Iterator externalTblCopyPathIterator) { + if (this.externalTblCopyPathIterator != null) { + throw new IllegalStateException("External table copy path iterator has already been initialized"); } - this.dirCopyIterator = dirCopyIterator; + this.externalTblCopyPathIterator = externalTblCopyPathIterator; } - public void setManagedTableCopyPathIterator(Iterator managedTableCopyPathIterator) { - if (this.managedTableCopyPathIterator != null) { + public void setManagedTableCopyPathIterator(Iterator managedTblCopyPathIterator) { + if (this.managedTblCopyPathIterator != null) { throw new IllegalStateException("Managed table copy path iterator has already been initialized"); } - this.managedTableCopyPathIterator = managedTableCopyPathIterator; + this.managedTblCopyPathIterator = managedTblCopyPathIterator; } public boolean tableDataCopyIteratorsInitialized() { - return dirCopyIterator != null || managedTableCopyPathIterator != null; + return externalTblCopyPathIterator != null || managedTblCopyPathIterator != null; } public Path getCurrentDumpPath() { @@ -169,8 +174,9 @@ public void setResultValues(List resultValues) { return Collections.emptyList(); } List> tasks = new ArrayList<>(); - while (dirCopyIterator.hasNext() && tracker.canAddMoreTasks()) { - DirCopyWork dirCopyWork = dirCopyIterator.next(); + while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { + DirCopyWork dirCopyWork = new DirCopyWork(); + dirCopyWork.loadFromString(externalTblCopyPathIterator.next()); Task task = TaskFactory.get(dirCopyWork, conf); tasks.add(task); tracker.addTask(task); @@ -184,8 +190,12 @@ public void setResultValues(List resultValues) { return Collections.emptyList(); } List> tasks = new ArrayList<>(); - while (managedTableCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { - EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next(); + while (managedTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { + ReplicationSpec replSpec = new ReplicationSpec(); + replSpec.setIsReplace(true); + replSpec.setInReplicationScope(true); + EximUtil.ManagedTableCopyPath managedTableCopyPath = new EximUtil.ManagedTableCopyPath(replSpec); + managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next()); Task copyTask = ReplCopyTask.getLoadCopyTask( managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite); @@ -207,4 +217,20 @@ public ReplicationMetricCollector getMetricCollector() { public void setMetricCollector(ReplicationMetricCollector metricCollector) { this.metricCollector = metricCollector; } + + public ReplicationSpec getReplicationSpec() { + return replicationSpec; + } + + public void setReplicationSpec(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } + + public FileList getFileList(Path backingFile, int cacheSize, HiveConf conf, boolean b) throws IOException { + return (mockedFileList == null) ? new FileList(backingFile, cacheSize, conf) : mockedFileList; + } + + public void setMockedFileList(FileList mockedFileList) { + this.mockedFileList = mockedFileList; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index a47a728a77..b6e085891d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -45,7 +46,6 @@ import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -135,10 +135,10 @@ private boolean shouldWrite() { * table if the table is partitioned and the partition location is outside the table. * It returns list of all the external table locations. */ - List dataLocationDump(Table table) throws InterruptedException, IOException, HiveException { - List extTableLocations = new LinkedList<>(); + void dataLocationDump(Table table, FileList fileList, HiveConf conf) + throws InterruptedException, IOException, HiveException { if (!shouldWrite()) { - return extTableLocations; + return; } if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { throw new IllegalArgumentException( @@ -148,7 +148,7 @@ private boolean shouldWrite() { Path fullyQualifiedDataLocation = PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); - extTableLocations.add(fullyQualifiedDataLocation); + dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf); if (table.isPartitioned()) { List partitions; try { @@ -157,7 +157,7 @@ private boolean shouldWrite() { if (e.getCause() instanceof NoSuchObjectException) { // If table is dropped when dump in progress, just skip partitions data location dump LOG.debug(e.getMessage()); - return extTableLocations; + return; } throw e; } @@ -170,11 +170,17 @@ private boolean shouldWrite() { fullyQualifiedDataLocation = PathBuilder .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); - extTableLocations.add(fullyQualifiedDataLocation); + dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf); } } } - return extTableLocations; + } + + private void dirLocationToCopy(FileList fileList, Path sourcePath, HiveConf conf) + throws HiveException { + Path basePath = getExternalTableBaseDir(conf); + Path targetPath = externalTableDataPath(conf, basePath, sourcePath); + fileList.add(new DirCopyWork(sourcePath, targetPath).convertToString()); } private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a48cd5b3fc..2635a67ce7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -48,12 +48,14 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.HiveTableName; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; @@ -182,6 +184,7 @@ private int executeBootStrapLoad() throws Exception { Context loadContext = new Context(work.dumpDirectory, conf, getHive(), work.sessionStateLineageState, context); TaskTracker loadTaskTracker = new TaskTracker(maxTasks); + addLazyDataCopyTask(loadTaskTracker); /* for now for simplicity we are doing just one directory ( one database ), come back to use of multiple databases once we have the basic flow to chain creating of tasks in place for @@ -330,6 +333,22 @@ a database ( directory ) return 0; } + private void addLazyDataCopyTask(TaskTracker loadTaskTracker) throws IOException { + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + if (dataCopyAtLoad) { + if (work.getExternalTableDataCopyItr() == null) { + Path extTableBackingFile = new Path(work.dumpDirectory, EximUtil.FILE_LIST_EXTERNAL); + try(FileList fileList = new FileList(extTableBackingFile, 0, conf)) { + work.setExternalTableDataCopyItr(fileList); + } + } + if (childTasks == null) { + childTasks = new ArrayList<>(); + } + childTasks.addAll(work.externalTableCopyTasks(loadTaskTracker, conf)); + } + } + private TaskTracker addLoadPartitionTasks(Context loadContext, BootstrapEvent next, TaskTracker dbTracker, BootstrapEventsIterator iterator, Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker) throws Exception { @@ -545,6 +564,7 @@ private int executeIncrementalLoad() throws Exception { List> childTasks = new ArrayList<>(); int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); TaskTracker tracker = new TaskTracker(maxTasks); + addLazyDataCopyTask(tracker); childTasks.add(builder.build(context, getHive(), LOG, tracker)); // If there are no more events to be applied, add a task to update the last.repl.id of the // target database to the event id of the last event considered by the dump. Next diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 43bf365b4f..4050235233 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -22,26 +22,35 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.exec.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; @Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ReplLoadWork implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(ReplLoadWork.class); final String dbNameToLoadIn; final ReplScope currentReplScope; final String dumpDirectory; @@ -56,6 +65,7 @@ private final transient BootstrapEventsIterator bootstrapIterator; private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder; private transient Task rootTask; + private Iterator externalTableDataCopyItr; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -176,4 +186,29 @@ public ReplicationMetricCollector getMetricCollector() { public Long getDumpExecutionId() { return dumpExecutionId; } + + public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) { + return Collections.emptyList(); + } + List> tasks = new ArrayList<>(); + while (externalTableDataCopyItr.hasNext() && tracker.canAddMoreTasks()) { + DirCopyWork dirCopyWork = new DirCopyWork(); + dirCopyWork.loadFromString(externalTableDataCopyItr.next()); + Task task = TaskFactory.get(dirCopyWork, conf); + tasks.add(task); + tracker.addTask(task); + LOG.debug("Added task for {}", dirCopyWork); + } + LOG.info("Added total {} tasks for external table locations copy.", tasks.size()); + return tasks; + } + + public Iterator getExternalTableDataCopyItr() { + return externalTableDataCopyItr; + } + + public void setExternalTableDataCopyItr(Iterator externalTableDataCopyItr) { + this.externalTableDataCopyItr = externalTableDataCopyItr; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 025457b73f..c4cfcf96f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -285,12 +285,12 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc : LoadFileType.OVERWRITE_EXISTING); stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); } - + boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)), stagingDir, - context.hiveConf, false, false + context.hiveConf, copyAtLoad, false ); Task movePartitionTask = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 9e236fd697..35ea777ef5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; @@ -56,7 +57,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.util.BitSet; import java.util.Collections; import java.util.HashSet; @@ -298,8 +298,9 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent + table.getCompleteName() + " with source location: " + dataPath.toString() + " and target location " + tgtPath.toString()); + boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, - false, false); + copyAtLoad, false); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java new file mode 100644 index 0000000000..dc1a2c07f3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * A file backed list of Strings which is in-memory till the threshold. + */ +public class FileList implements AutoCloseable, Iterator { + private static final Logger LOG = LoggerFactory.getLogger(FileList.class); + private static int fileListStreamerID = 0; + private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; + + private LinkedBlockingQueue cache; + private volatile boolean thresholdHit = false; + private int thresholdPoint; + private float thresholdFactor = 0.9f; + private Path backingFile; + private FileListStreamer fileListStreamer; + private String nextElement; + private boolean noMoreElement; + private HiveConf conf; + private BufferedReader backingFileReader; + + + public FileList(Path backingFile, int cacheSize, HiveConf conf) throws IOException { + this.backingFile = backingFile; + if (cacheSize > 0) { + // Cache size must be > 0 for this list to be used for the write operation. + this.cache = new LinkedBlockingQueue<>(cacheSize); + fileListStreamer = new FileListStreamer(cache, backingFile, conf); + LOG.debug("File list backed by {} can be used for write operation.", backingFile); + } else { + thresholdHit = true; + } + this.conf = conf; + thresholdPoint = getThreshold(cacheSize); + } + + /** + * Only add operation is safe for concurrent operations. + */ + public void add(String entry) throws SemanticException { + if (thresholdHit && !fileListStreamer.isAlive()) { + throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + } + try { + cache.put(entry); + } catch (InterruptedException e) { + throw new SemanticException(e); + } + if (!thresholdHit && cache.size() >= thresholdPoint) { + initStoreToFile(cache.size()); + } + } + + @Override + public boolean hasNext() { + if (!thresholdHit) { + return (cache != null && !cache.isEmpty()); + } + if (nextElement != null) { + return true; + } + if (noMoreElement) { + return false; + } + nextElement = readNextLine(); + if (nextElement == null) { + noMoreElement = true; + } + return !noMoreElement; + } + + @Override + public String next() { + if (!hasNext()) { + throw new NoSuchElementException("No more element in the list backed by " + backingFile); + } + String retVal = nextElement; + nextElement = null; + return thresholdHit ? retVal : cache.poll(); + } + + private synchronized void initStoreToFile(int cacheSize) { + if (!thresholdHit) { + fileListStreamer.setName(getNextID()); + fileListStreamer.setDaemon(true); + fileListStreamer.start(); + thresholdHit = true; + LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize); + } + } + + private String readNextLine() { + String nextElement = null; + try { + if (backingFileReader == null) { + FileSystem fs = FileSystem.get(backingFile.toUri(), conf); + if (fs.exists(backingFile)) { + backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile))); + } + } + nextElement = (backingFileReader == null) ? null : backingFileReader.readLine(); + } catch (IOException e) { + LOG.error("Unable to read list from backing file " + backingFile, e); + } + return nextElement; + } + + @Override + public void close() throws IOException { + if (thresholdHit && fileListStreamer != null) { + fileListStreamer.close(); + } + if (backingFileReader != null) { + backingFileReader.close(); + } + LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit); + } + + private static String getNextID() { + if (Integer.MAX_VALUE == fileListStreamerID) { + //reset the counter + fileListStreamerID = 0; + } + fileListStreamerID++; + return FILE_LIST_STREAMER_PREFIX + fileListStreamerID; + } + + public int getThreshold(int cacheSize) { + boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor); + } + + @VisibleForTesting + public boolean isStreamingToFile() { + return isStreamingInitialized() && fileListStreamer.isAlive(); + } + + @VisibleForTesting + public boolean isStreamingInitialized() { + return fileListStreamer.isInitialized(); + } + + @VisibleForTesting + public boolean isStreamingClosedProperly() { + return fileListStreamer.isInitialized() && !fileListStreamer.isAlive() && fileListStreamer.isValid(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java new file mode 100644 index 0000000000..0d956fa33a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class FileListStreamer extends Thread implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class); + private static BufferedWriter backingFileWriterInTest; + private static final long TIMEOUT_IN_SECS = 5L; + private volatile boolean signalTostop; + private final LinkedBlockingQueue cache; + private Path backingFile; + private Configuration conf; + private BufferedWriter backingFileWriter; + private volatile boolean valid = true; + private final Object COMPLETION_LOCK = new Object(); + private volatile boolean completed = false; + private volatile boolean initialized = false; + + + + public FileListStreamer(LinkedBlockingQueue cache, Path backingFile, Configuration conf) throws IOException { + this.cache = cache; + this.backingFile = backingFile; + this.conf = conf; + } + + private void lazyInit() throws IOException { + if (backingFileWriterInTest == null) { + FileSystem fs = FileSystem.get(backingFile.toUri(), conf); + backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile))); + } else { + backingFileWriter = backingFileWriterInTest; + } + initialized = true; + LOG.info("Initialized a file based store to save a list at: {}", backingFile); + } + + public boolean isValid() { + return valid; + } + + // Blocks for remaining entries to be flushed to file. + @Override + public void close() throws IOException { + signalTostop = true; + synchronized (COMPLETION_LOCK) { + while (motiveToWait()) { + try { + COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS)); + } catch (InterruptedException e) { + // no-op + } + } + } + if (!isValid()) { + throw new IOException("File list is not in a valid state:" + backingFile); + } + } + + private boolean motiveToWait() { + return !completed && valid; + } + + @Override + public void run() { + try { + lazyInit(); + } catch (IOException e) { + valid = false; + throw new RuntimeException("Unable to initialize the file list streamer", e); + } + boolean exThrown = false; + while (!exThrown && (!signalTostop || !cache.isEmpty())) { + try { + String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS); + if (nextEntry != null) { + backingFileWriter.write(nextEntry); + backingFileWriter.newLine(); + LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile); + } + } catch (Exception iEx) { + if (!(iEx instanceof InterruptedException)) { + // not draining any more. Inform the producer to avoid OOM. + valid = false; + LOG.error("Exception while saving the list to file " + backingFile, iEx); + exThrown = true; + } + } + } + try{ + closeBackingFile(); + completed = true; + } finally { + synchronized (COMPLETION_LOCK) { + COMPLETION_LOCK.notify(); + } + } + LOG.info("Completed the file list streamer backed by: {}", backingFile); + } + + private void closeBackingFile() { + try { + backingFileWriter.close(); + LOG.debug("Closed the file list backing file: {}", backingFile); + } catch (IOException e) { + LOG.error("Exception while closing the file list backing file", e); + valid = false; + } + } + + @VisibleForTesting + public static void setBackingFileWriterInTest(BufferedWriter bufferedWriter) { + backingFileWriterInTest = bufferedWriter; + } + + @VisibleForTesting + public boolean isInitialized() { + return initialized; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index bccf56af9e..ecf51a9f32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.repl.ReplConst; @@ -55,7 +54,6 @@ import org.apache.thrift.TException; import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java new file mode 100644 index 0000000000..8c8120ba6d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +public interface StringConvertibleObject { + + public String convertToString (); + + public void loadFromString (String objectInStr); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index fb6a38cd43..b35f7ab6ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.StringConvertibleObject; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -74,6 +75,8 @@ public static final String METADATA_NAME = "_metadata"; public static final String FILES_NAME = "_files"; + public static final String FILE_LIST = "_file_list"; + public static final String FILE_LIST_EXTERNAL = "_file_list_external"; public static final String DATA_PATH_NAME = "data"; public static final String METADATA_PATH_NAME = "metadata"; @@ -161,12 +164,17 @@ public void setOpenTxnTask(Task openTxnTask) { /** * Wrapper class for mapping source and target path for copying managed table data. */ - public static class ManagedTableCopyPath { + public static class ManagedTableCopyPath implements StringConvertibleObject { + private static final String URI_SEPARATOR = "#"; private ReplicationSpec replicationSpec; private static boolean nullSrcPathForTest = false; private Path srcPath; private Path tgtPath; + public ManagedTableCopyPath(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } + public ManagedTableCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) { this.replicationSpec = replicationSpec; if (srcPath == null) { @@ -215,6 +223,26 @@ public static void setNullSrcPath(HiveConf conf, boolean aNullSrcPath) { nullSrcPathForTest = aNullSrcPath; } } + + @Override + public String convertToString() { + StringBuilder objInStr = new StringBuilder(); + objInStr.append(srcPath) + .append(URI_SEPARATOR) + .append(tgtPath); + return objInStr.toString(); + } + + @Override + public void loadFromString(String objectInStr) { + String paths[] = objectInStr.split(URI_SEPARATOR); + this.srcPath = new Path(paths[0]); + this.tgtPath = new Path(paths[1]); + } + + private String getEmptyOrString(String str) { + return (str == null) ? "" : str; + } } private EximUtil() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 9cd9137ce6..55c84277fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -501,8 +501,9 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { + boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), - isSkipTrash, needRecycle, copyToMigratedTxnTable, false); + isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad); } else { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } @@ -648,8 +649,9 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { + boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath, - x.getConf(), isSkipTrash, needRecycle, copyToMigratedTxnTable, false); + x.getConf(), isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad); } else { copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 5c8d0edd77..8675ace7c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import javax.annotation.Nullable; -import java.text.Collator; import java.util.Map; /** @@ -54,6 +53,10 @@ private boolean isRepl = false; private boolean isMetadataOnlyForExternalTables = false; + public void setInReplicationScope(boolean inReplicationScope) { + isInReplicationScope = inReplicationScope; + } + // Key definitions related to replication. public enum KEY { REPL_SCOPE("repl.scope"), @@ -122,7 +125,7 @@ public ReplicationSpec(String fromId, String toId) { public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isReplace) { - this.isInReplicationScope = isInReplicationScope; + this.setInReplicationScope(isInReplicationScope); this.isMetadataOnly = isMetadataOnly; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; @@ -133,15 +136,15 @@ public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, public ReplicationSpec(Function keyFetcher) { String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString()); - this.isInReplicationScope = false; + this.setInReplicationScope(false); this.isMetadataOnly = false; this.specType = Type.DEFAULT; if (scope != null) { if (scope.equalsIgnoreCase("metadata")) { this.isMetadataOnly = true; - this.isInReplicationScope = true; + this.setInReplicationScope(true); } else if (scope.equalsIgnoreCase("all")) { - this.isInReplicationScope = true; + this.setInReplicationScope(true); } } this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString()); @@ -227,7 +230,7 @@ public boolean apply(@Nullable Partition partition) { private void init(ASTNode node){ // -> ^(TOK_REPLICATION $replId $isMetadataOnly) - isInReplicationScope = true; + setInReplicationScope(true); eventId = PlanUtils.stripQuotes(node.getChild(0).getText()); if ((node.getChildCount() > 1) && node.getChild(1).getText().toLowerCase().equals("metadata")) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 3de583276e..65d4fbf023 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; @@ -74,7 +75,8 @@ this.callersSession = SessionState.get(); } - List write(final ReplicationSpec forReplicationSpec, boolean isExportTask) + List write(final ReplicationSpec forReplicationSpec, boolean isExportTask, + FileList fileList, boolean dataCopyAtLoad) throws InterruptedException, HiveException { List> futures = new LinkedList<>(); List managedTableCopyPaths = new LinkedList<>(); @@ -114,16 +116,19 @@ String threadName = Thread.currentThread().getName(); LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName); try { - // Data Copy in case of ExportTask + // Data Copy in case of ExportTask or when dataCopyAtLoad is true List dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); - Path rootDataDumpDir = paths.partitionMetadataExportDir(partitionName); + Path rootDataDumpDir = isExportTask + ? paths.partitionMetadataExportDir(partitionName) : paths.partitionDataExportDir(partitionName); new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) - .export(isExportTask); + .export(isExportTask, dataCopyAtLoad); Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); - return new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(), - dataDumpDir); + if (!(isExportTask || dataCopyAtLoad)) { + fileList.add(new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(), + dataDumpDir).convertToString()); + } } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } @@ -132,11 +137,7 @@ consumer.shutdown(); for (Future future : futures) { try { - Object retVal = future.get(); - if (retVal != null) { - ManagedTableCopyPath managedTableCopyPath = (ManagedTableCopyPath) retVal; - managedTableCopyPaths.add(managedTableCopyPath); - } + future.get(); } catch (Exception e) { LOG.error("failed", e.getCause()); throw new HiveException(e.getCause().getMessage(), e.getCause()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index b11afe80a1..66cf494dd6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -96,8 +97,7 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.mmCtx = mmCtx; } - public List write(boolean isExportTask) throws SemanticException { - List managedTableCopyPaths = Collections.emptyList(); + public void write(boolean isExportTask, FileList fileList, boolean dataCopyAtLoad) throws SemanticException { if (tableSpec == null) { writeMetaData(null); } else if (shouldExport()) { @@ -105,12 +105,11 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication writeMetaData(withPartitions); if (!replicationSpec.isMetadataOnly() && !(replicationSpec.isRepl() && tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE))) { - managedTableCopyPaths = writeData(withPartitions, isExportTask); + writeData(withPartitions, isExportTask, fileList, dataCopyAtLoad); } } else if (isExportTask) { throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg()); } - return managedTableCopyPaths; } private PartitionIterable getPartitions() throws SemanticException { @@ -161,26 +160,26 @@ private void writeMetaData(PartitionIterable partitions) throws SemanticExceptio } } - private List writeData(PartitionIterable partitions, boolean isExportTask) + private void writeData(PartitionIterable partitions, boolean isExportTask, FileList fileList, boolean dataCopyAtLoad) throws SemanticException { - List managedTableCopyPaths = new ArrayList<>(); try { if (tableSpec.tableHandle.isPartitioned()) { if (partitions == null) { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.getTableName().getTable()); } - managedTableCopyPaths = new PartitionExport( - paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec, isExportTask); + new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec, isExportTask, + fileList, dataCopyAtLoad); } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); - managedTableCopyPaths.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(), - paths.dataExportDir())); + if (!(isExportTask || dataCopyAtLoad)) { + fileList.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(), + paths.dataExportDir()).convertToString()); + } new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) - .export(isExportTask); + .export(isExportTask, (dataCopyAtLoad)); } - return managedTableCopyPaths; } catch (Exception e) { throw new SemanticException(e.getMessage(), e); } @@ -244,6 +243,10 @@ Path partitionMetadataExportDir(String partitionName) throws SemanticException { return exportDir(new Path(metadataExportRootDir(), partitionName)); } + Path partitionDataExportDir(String partitionName) throws SemanticException { + return exportDir(new Path(dataExportRootDir(), partitionName)); + } + /** * Access to the {@link #metadataExportRootDir} should only be done via this method * since the creation of the directory is delayed until we figure out if we want diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index 82bf384a74..e758c8da53 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -38,7 +38,9 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.BufferedWriter; import java.io.IOException; +import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; @@ -88,6 +90,23 @@ public long toEventId() { return event.getEventId(); } + private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { + Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); + FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); + return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + } + + protected void writeEncodedDumpFiles(Context withinContext, Iterable files, Path dataPath) + throws IOException { + // encoded filename/checksum of files, write into _files + try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { + for (String file : files) { + fileListWriter.write(file); + fileListWriter.newLine(); + } + } + } + protected void writeFileEntry(Table table, Partition ptn, String file, Context withinContext) throws IOException, LoginException, MetaException, HiveFatalException { HiveConf hiveConf = withinContext.hiveConf; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 6462e17515..3120c960e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import java.io.File; import java.util.Iterator; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -95,6 +96,7 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec, withinContext.hiveConf); + boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); Iterator partitionFilesIter = apm.getPartitionFilesIter().iterator(); // We expect one to one mapping between partitions and file iterators. For external table, this @@ -103,9 +105,15 @@ public void handle(Context withinContext) throws Exception { for (Partition qlPtn : qlPtns) { Iterable files = partitionFilesIter.next().getFiles(); if (files != null) { - // encoded filename/checksum of files, write into _files - for (String file : files) { - writeFileEntry(qlMdTable, qlPtn, file, withinContext); + if (copyAtLoad) { + // encoded filename/checksum of files, write into _files + Path ptnDataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator + + qlPtn.getName()); + writeEncodedDumpFiles(withinContext, files, ptnDataPath); + } else { + for (String file : files) { + writeFileEntry(qlMdTable, qlPtn, file, withinContext); + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 36369db69d..fcb919a3e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import javax.security.auth.login.LoginException; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -56,11 +57,17 @@ CommitTxnMessage eventMessage(String stringRepresentation) { return deserializer.getCommitTxnMessage(stringRepresentation); } - private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable files, Context withinContext) + private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable files, Context withinContext, + Path dataPath) throws IOException, LoginException, MetaException, HiveFatalException { - // encoded filename/checksum of files, write into _files - for (String file : files) { - writeFileEntry(qlMdTable, ptn, file, withinContext); + boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + if (copyAtLoad) { + // encoded filename/checksum of files, write into _files + writeEncodedDumpFiles(withinContext, files, dataPath); + } else { + for (String file : files) { + writeFileEntry(qlMdTable, ptn, file, withinContext); + } } } @@ -81,10 +88,13 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met withinContext.hiveConf); if ((null == qlPtns) || qlPtns.isEmpty()) { - writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext); + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext, dataPath); } else { for (int idx = 0; idx < qlPtns.size(); idx++) { - writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext); + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator + + qlPtns.get(idx).getName()); + writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext, dataPath); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index c732b21cdc..ed78bc03c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.ql.metadata.Table; @@ -26,10 +26,6 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; - class CreateTableHandler extends AbstractEventHandler { CreateTableHandler(NotificationEvent event) { @@ -79,23 +75,23 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec, withinContext.hiveConf); + boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); Iterable files = eventMessage.getFiles(); if (files != null) { - // encoded filename/checksum of files, write into _files - for (String file : files) { - writeFileEntry(qlMdTable, null, file, withinContext); + if (copyAtLoad) { + // encoded filename/checksum of files, write into _files + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + writeEncodedDumpFiles(withinContext, files, dataPath); + } else { + for (String file : files) { + writeFileEntry(qlMdTable, null, file, withinContext); + } } } withinContext.createDmd(this).write(); } - private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { - FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); - Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); - return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); - } - @Override public DumpType dumpType() { return DumpType.EVENT_CREATE_TABLE; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 09662dacf9..704b9960e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import java.io.File; import java.util.Collections; import java.util.List; @@ -76,6 +77,8 @@ public void handle(Context withinContext) throws Exception { withinContext.hiveConf); Iterable files = eventMessage.getFiles(); + boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + /* * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple tables. * But, Insert event is generated for each partition to which the data is inserted. @@ -83,9 +86,20 @@ public void handle(Context withinContext) throws Exception { */ Partition ptn = (null == qlPtns || qlPtns.isEmpty()) ? null : qlPtns.get(0); if (files != null) { - // encoded filename/checksum of files, write into _files - for (String file : files) { - writeFileEntry(qlMdTable, ptn, file, withinContext); + if (copyAtLoad) { + // encoded filename/checksum of files, write into _files + Path dataPath = null; + if ((null == qlPtns) || qlPtns.isEmpty()) { + dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + } else { + dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator + + qlPtns.get(0).getName()); + } + writeEncodedDumpFiles(withinContext, files, dataPath); + } else { + for (String file : files) { + writeFileEntry(qlMdTable, ptn, file, withinContext); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 5754bf2656..80bfdfb3db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; +import java.io.BufferedWriter; +import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStreamWriter; import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -28,14 +31,22 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.utils.Retry; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,9 +79,11 @@ public FileOperations(List dataPathList, Path exportRootDataDir, String di exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); } - public void export(boolean isExportTask) throws Exception { + public void export(boolean isExportTask, boolean dataCopyAtLoad) throws Exception { if (isExportTask) { copyFiles(); + } else if (dataCopyAtLoad) { + exportFilesAsList(); } else { validateSrcPathListExists(); } @@ -165,4 +178,83 @@ private void validateSrcPathListExists() throws IOException, LoginException { throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); } } + + /** + * This needs the root data directory to which the data needs to be exported to. + * The data export here is a list of files either in table/partition that are written to the _files + * in the exportRootDataDir provided. + */ + void exportFilesAsList() throws SemanticException, IOException, LoginException { + if (dataPathList.isEmpty()) { + return; + } + Retry retryable = new Retry(IOException.class) { + @Override + public Void execute() throws Exception { + try (BufferedWriter writer = writer()) { + for (Path dataPath : dataPathList) { + writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); + } + // in case of io error, reset the file system object + FileSystem.closeAllForUGI(Utils.getUGI()); + dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); + exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); + Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME); + if (exportFileSystem.exists(exportPath)) { + exportFileSystem.delete(exportPath, true); + } + throw e; + } + return null; + } + }; + try { + retryable.run(); + } catch (Exception e) { + throw new SemanticException(e); + } + } + + private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs) + throws IOException { + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isDirectory()) { + // Write files inside the sub-directory. + Path subDir = fileStatus.getPath(); + writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir)); + } else { + writer.write(encodedUri(fileStatus, encodedSubDirs)); + writer.newLine(); + } + } + } + + private BufferedWriter writer() throws IOException { + Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME); + logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile); + return new BufferedWriter( + new OutputStreamWriter(exportFileSystem.create(exportToFile)) + ); + } + + private String encodedSubDir(String encodedParentDirs, Path subDir) { + if (null == encodedParentDirs) { + return subDir.getName(); + } else { + return encodedParentDirs + Path.SEPARATOR + subDir.getName(); + } + } + + private String encodedUri(FileStatus fileStatus, String encodedSubDir) + throws IOException { + ReplChangeManager replChangeManager = ReplChangeManager.getInstance(); + Path currentDataFilePath = fileStatus.getPath(); + String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem); + return replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 8454b9c420..f90f1eb284 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -21,9 +21,9 @@ import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; @@ -121,8 +121,8 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw when(queryState.getConf()).thenReturn(conf); when(conf.getLong("hive.repl.last.repl.id", -1L)).thenReturn(1L); when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false); - when(HiveConf.getVar(conf, - HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h"); + when(HiveConf.getVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h"); + when(conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE)).thenReturn(10000); whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class)); whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class)); @@ -131,16 +131,15 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw private int tableDumpCount = 0; @Override - List dumpTable(String dbName, String tblName, String validTxnList, - Path dbRootMetadata, Path dbRootData, - long lastReplId, Hive hiveDb, - HiveWrapper.Tuple
tuple) + void dumpTable(String dbName, String tblName, String validTxnList, + Path dbRootMetadata, Path dbRootData, + long lastReplId, Hive hiveDb, + HiveWrapper.Tuple
tuple, FileList managedTableDirFileList, boolean dataCopyAtLoad) throws Exception { tableDumpCount++; if (tableDumpCount > 1) { throw new TestException(); } - return Collections.emptyList(); } }; @@ -148,6 +147,7 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw ReplDumpWork replDumpWork = new ReplDumpWork(replScope, null, "", ""); replDumpWork.setMetricCollector(metricCollector); + replDumpWork.setMockedFileList(mock(FileList.class)); task.setWork(replDumpWork); try { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java new file mode 100644 index 0000000000..52b049ede2 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest({LoggerFactory.class}) +public class TestFileList { + + @Mock + private HiveConf hiveConf; + + @Mock + BufferedWriter bufferedWriter; + + @Test + public void testNoStreaming() throws Exception { + Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false); + Path backingFile = new Path("/tmp/backingFile"); + FileList fileList = new FileList(backingFile, 100, hiveConf); + fileList.add("Entry1"); + fileList.add("Entry2"); + assertFalse(fileList.isStreamingToFile()); + } + + @Test + public void testAlwaysStreaming() throws Exception { + Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(true); + FileListStreamer.setBackingFileWriterInTest(bufferedWriter); + Path backingFile = new Path("/tmp/backingFile"); + FileList fileList = new FileList(backingFile, 100, hiveConf); + assertFalse(fileList.isStreamingInitialized()); + fileList.add("Entry1"); + waitForStreamingInitialization(fileList); + assertTrue(fileList.isStreamingToFile()); + fileList.close(); + waitForStreamingClosure(fileList); + } + + @Test + public void testStreaminOnCacheHit() throws Exception { + Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false); + FileListStreamer.setBackingFileWriterInTest(bufferedWriter); + Path backingFile = new Path("/tmp/backingFile"); + FileList fileList = new FileList(backingFile, 5, hiveConf); + fileList.add("Entry1"); + fileList.add("Entry2"); + fileList.add("Entry3"); + Thread.sleep(5000L); + assertFalse(fileList.isStreamingInitialized()); + fileList.add("Entry4"); + fileList.add("Entry5"); + waitForStreamingInitialization(fileList); + fileList.close(); + waitForStreamingClosure(fileList); + } + + @Test + public void testConcurrentAdd() throws Exception { + Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(false); + FileListStreamer.setBackingFileWriterInTest(bufferedWriter); + Path backingFile = new Path("/tmp/backingFile"); + FileList fileList = new FileList(backingFile, 100, hiveConf); + int numOfEntries = 1000; + int numOfThreads = 10; + ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads); + + for (int i=1; i<=numOfEntries; i++) { + executorService.submit(() -> { + try { + fileList.add("someEntry"); + } catch (SemanticException e) { + throw new RuntimeException("Unbale to add to file list."); + } + }); + } + executorService.awaitTermination(1, TimeUnit.MINUTES); + waitForStreamingInitialization(fileList); + fileList.close(); + waitForStreamingClosure(fileList); + ArgumentCaptor entryArgs = ArgumentCaptor.forClass(String.class); + Mockito.verify(bufferedWriter, Mockito.times(numOfEntries)).write(entryArgs.capture()); + } + + private void waitForStreamingInitialization(FileList fileList) throws InterruptedException { + long sleepTime = 1000L; + int iter = 0; + while (!fileList.isStreamingInitialized()) { + Thread.sleep(sleepTime); + iter++; + if (iter == 5) { + throw new IllegalStateException("File Streamer not initialized till 5s."); + } + } + } + + private void waitForStreamingClosure(FileList fileList) throws InterruptedException { + long sleepTime = 1000L; + int iter = 0; + while (!fileList.isStreamingClosedProperly()) { + Thread.sleep(sleepTime); + iter++; + if (iter == 5) { + throw new IllegalStateException("File Streamer not getting closed till 5s."); + } + } + } +} diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 110b335896..3af74ba281 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -148,6 +148,13 @@ public static synchronized ReplChangeManager getInstance(Configuration conf) return instance; } + public static synchronized ReplChangeManager getInstance() { + if (!inited) { + throw new IllegalStateException("Replication Change Manager is not initialized."); + } + return instance; + } + private ReplChangeManager(Configuration conf) throws MetaException { try { if (!inited) {