diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a617a1af46..f60d099497 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -524,6 +524,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "task increment that would cross the specified limit."), REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",100, "Number of threads that will be used to dump partition data information during repl dump."), + REPL_DATA_COPY_LAZY("hive.repl.data.copy.lazy", false, + "Indicates whether replication should run data copy tasks during repl load operation."), + REPL_FILE_LIST_CACHE_SIZE("hive.repl.file.list.cache.size", 10000, + "This config indicates threshold for the maximum number of data copy locations to be kept in memory. \n" + + "When the config 'hive.repl.data.copy.lazy' is set to true, this config is not considered."), REPL_DUMPDIR_CLEAN_FREQ("hive.repl.dumpdir.clean.freq", "0s", new TimeValidator(TimeUnit.SECONDS), "Frequency at which timer task runs to purge expired dump dirs."), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 169fed857d..66b0d078c2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -256,15 +257,32 @@ private Tuple bootstrapLoadAndVerify(String dbName, String replDbName) throws IO return incrementalLoadAndVerify(dbName, replDbName); } + private Tuple bootstrapLoadAndVerify(String dbName, String replDbName, List withClauseOptions) + throws IOException { + return incrementalLoadAndVerify(dbName, replDbName, withClauseOptions); + } + private Tuple incrementalLoadAndVerify(String dbName, String replDbName) throws IOException { Tuple dump = replDumpDb(dbName); loadAndVerify(replDbName, dbName, dump.lastReplId); return dump; } + private Tuple incrementalLoadAndVerify(String dbName, String replDbName, List withClauseOptions) + throws IOException { + Tuple dump = replDumpDb(dbName, withClauseOptions); + loadAndVerify(replDbName, dbName, dump.lastReplId, withClauseOptions); + return dump; + } + private Tuple replDumpDb(String dbName) throws IOException { + return replDumpDb(dbName, null); + } + + private Tuple replDumpDb(String dbName, List withClauseOptions) throws IOException { + String withClause = getWithClause(withClauseOptions); advanceDumpDir(); - String dumpCmd = "REPL DUMP " + dbName; + String dumpCmd = "REPL DUMP " + dbName + withClause; run(dumpCmd, driver); String dumpLocation = getResult(0, 0, driver); String lastReplId = getResult(0, 1, true, driver); @@ -272,8 +290,21 @@ private Tuple replDumpDb(String dbName) throws IOException { return new Tuple(dumpLocation, lastReplId); } + private String getWithClause(List withClauseOptions) { + if (withClauseOptions != null && !withClauseOptions.isEmpty()) { + return " with (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + return ""; + } + private void loadAndVerify(String replDbName, String sourceDbNameOrPattern, String lastReplId) throws IOException { - run("REPL LOAD " + sourceDbNameOrPattern + " INTO " + replDbName, driverMirror); + loadAndVerify(replDbName, sourceDbNameOrPattern, lastReplId, null); + } + + private void loadAndVerify(String replDbName, String sourceDbNameOrPattern, String lastReplId, + List withClauseOptions) throws IOException { + String withClause = getWithClause(withClauseOptions); + run("REPL LOAD " + sourceDbNameOrPattern + " INTO " + replDbName + withClause, driverMirror); verifyRun("REPL STATUS " + replDbName, lastReplId, driverMirror); return; } @@ -546,6 +577,61 @@ public void testBasicWithCM() throws Exception { verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); } + @Test + public void testBasicWithCMLazyCopy() throws Exception { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + String replDbName = dbName + "_dupe"; + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + String[] unptn_data = new String[]{ "eleven" , "twelve" }; + String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; + String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; + String[] empty = new String[]{}; + + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); + + createTestDataFile(unptn_locn, unptn_data); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1, driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driver); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); + + String lazyCopyClause = " with ('" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true')"; + + advanceDumpDir(); + run("REPL DUMP " + dbName + lazyCopyClause, driver); + String replDumpLocn = getResult(0,0, driver); + String replDumpId = getResult(0,1,true, driver); + + // Table dropped after "repl dump" + run("DROP TABLE " + dbName + ".unptned", driver); + + // Partition droppped after "repl dump" + run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver); + + run("REPL LOAD " + dbName + " INTO " + replDbName + lazyCopyClause, driverMirror); + verifyRun("REPL STATUS " + replDbName, new String[] {replDumpId}, driverMirror); + + verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_empty", empty, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); + } + @Test public void testBootstrapLoadOnExistingDb() throws IOException { String testName = "bootstrapLoadOnExistingDb"; @@ -1559,6 +1645,76 @@ public void testIncrementalLoad() throws IOException { verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); } + @Test + public void testIncrementalLoadLazyCopy() throws IOException { + String testName = "testIncrementalLoadLazyCopy"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + List lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'"); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName, lazyCopyClause); + + String[] unptnData = new String[] {"eleven", "twelve"}; + String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"}; + String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"}; + String[] empty = new String[] {}; + + String unptnLocn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath(); + + createTestDataFile(unptnLocn, unptnData); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); + + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror); + + run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver); + + Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName, lazyCopyClause); + + Path hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), hconf); + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); + + run("CREATE TABLE " + dbName + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + + ".ptned WHERE b=1", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver); + + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + + ".ptned WHERE b=2", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); + + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); + hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); + } + @Test public void testIncrementalInserts() throws IOException { String testName = "incrementalInserts"; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 493a467ac6..04ead62aab 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -210,6 +210,66 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true); } + @Test + public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable { + List lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'"); + //creates external tables with partitions + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, lazyCopyClause); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false); + + + + replica.load(replicatedDbName, primaryDbName, lazyCopyClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 where country = 'us'") + .verifyResult("us") + .run("select country from t2 where country = 'france'") + .verifyResult("france") + .run("show partitions t2").verifyResults(new String[] {"country=france", "country=india", "country=us"}); + + String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + // Ckpt should be set on bootstrapped db. + replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation); + + assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1"); + assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("create external table t4 as select id from t3") + .dump(primaryDbName, lazyCopyClause); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true); + + replica.load(replicatedDbName, primaryDbName, lazyCopyClause) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyResult("t3") + .run("select id from t3") + .verifyResult("10") + .run("select id from t4") + .verifyResult("10"); + } + /** * @param sourceTableName -- Provide the fully qualified table name * @param replicaTableName -- Provide the fully qualified table name diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index d3e94134ab..b81d96141a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -50,7 +50,7 @@ public int execute() { work.acidPostProcess(db); TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf, work.getMmContext()); - tableExport.write(true); + tableExport.write(true, null, false); } catch (Exception e) { LOG.error("failed", e); setException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java index 46f9bb3add..04bbd564be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.repl.util.StringConvertibleObject; import org.apache.hadoop.hive.ql.plan.Explain; import java.io.Serializable; @@ -27,10 +28,14 @@ @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) -public class DirCopyWork implements Serializable { +public class DirCopyWork implements Serializable, StringConvertibleObject { + private static final String URI_SEPARATOR = "#"; private static final long serialVersionUID = 1L; - private final Path fullyQualifiedSourcePath; - private final Path fullyQualifiedTargetPath; + private Path fullyQualifiedSourcePath; + private Path fullyQualifiedTargetPath; + + public DirCopyWork() { + } public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) { this.fullyQualifiedSourcePath = fullyQualifiedSourcePath; @@ -51,4 +56,20 @@ public Path getFullyQualifiedSourcePath() { public Path getFullyQualifiedTargetPath() { return fullyQualifiedTargetPath; } + + @Override + public String convertToString() { + StringBuilder objInStr = new StringBuilder(); + objInStr.append(fullyQualifiedSourcePath) + .append(URI_SEPARATOR) + .append(fullyQualifiedTargetPath); + return objInStr.toString(); + } + + @Override + public void loadFromString(String objectInStr) { + String paths[] = objectInStr.split(URI_SEPARATOR); + this.fullyQualifiedSourcePath = new Path(paths[0]); + this.fullyQualifiedTargetPath = new Path(paths[1]); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index b15b326b1c..8a56891c01 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -41,11 +41,11 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.metastore.utils.Retry; -import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; @@ -58,7 +58,6 @@ import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -92,7 +91,6 @@ import java.io.InputStreamReader; import java.io.Serializable; import java.io.UnsupportedEncodingException; -import java.net.URI; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set; @@ -465,9 +463,13 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive String validTxnList = null; long waitUntilTime = 0; long bootDumpBeginReplId = -1; - List managedTableCopyPaths = Collections.emptyList(); - List extTableCopyWorks = Collections.emptyList(); + + int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + FileList managedTblList = dataCopyAtLoad ? null : createTableFileList(dumpRoot, EximUtil.FILES_NAME, cacheSize); + FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILES_NAME_EXTERNAL, cacheSize); List tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); + // If we are bootstrapping ACID tables, we need to perform steps similar to a regular // bootstrap (See bootstrapDump() for more details. Only difference here is instead of // waiting for the concurrent transactions to finish, we start dumping the incremental events @@ -560,8 +562,6 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } Path dbRootMetadata = new Path(metadataPath, dbName); Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName); - managedTableCopyPaths = new ArrayList<>(); - List extTableLocations = new LinkedList<>(); try (Writer writer = new Writer(dumpRoot, conf)) { for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { try { @@ -570,15 +570,14 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // Dump external table locations if required. if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) && shouldDumpExternalTableLocation()) { - extTableLocations.addAll(writer.dataLocationDump(table)); + writer.dataLocationDump(table, extTableFileList, conf); } // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - managedTableCopyPaths.addAll( - dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, - hiveDb, tableTuple)); + dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, + hiveDb, tableTuple, managedTblList, dataCopyAtLoad); } if (tableList != null && isTableSatifiesConfig(table)) { tableList.add(tableName); @@ -591,14 +590,25 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); - extTableCopyWorks = dirLocationsToCopy(extTableLocations); } - work.setDirCopyIterator(extTableCopyWorks.iterator()); - work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); + setDataCopyIterators(extTableFileList, managedTblList); work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId); return lastReplId; } + private void setDataCopyIterators(FileList extTableFileList, FileList managedTableFileList) throws IOException { + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + extTableFileList.close(); + work.setExternalTblCopyPathIterator(extTableFileList); + if (dataCopyAtLoad) { + work.setManagedTableCopyPathIterator(Collections.emptyList().iterator()); + LOG.info("Deferring table/partition data copy during dump. It should be done at load."); + } else { + managedTableFileList.close(); + work.setManagedTableCopyPathIterator(managedTableFileList); + } + } + private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot) { ReplicationMetricCollector collector; if (isBootstrap) { @@ -750,23 +760,6 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); } - private List dirLocationsToCopy(List sourceLocations) - throws HiveException { - if (sourceLocations.isEmpty()) { - return Collections.emptyList(); - } - List list = new ArrayList<>(sourceLocations.size()); - String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); - // this is done to remove any scheme related information that will be present in the base path - // specifically when we are replicating to cloud storage - Path basePath = ReplExternalTables.getExternalTableBaseDir(conf); - for (Path sourcePath : sourceLocations) { - Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath); - list.add(new DirCopyWork(sourcePath, targetPath)); - } - return list; - } - Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { // bootstrap case @@ -778,8 +771,10 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) List tableList; LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); - List extTableCopyWorks = new ArrayList<>(); - List managedTableCopyPaths = new ArrayList<>(); + int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + FileList managedTblList = dataCopyAtLoad ? null : createTableFileList(dumpRoot, EximUtil.FILES_NAME, cacheSize); + FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILES_NAME_EXTERNAL, cacheSize); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); long waitUntilTime = System.currentTimeMillis() + timeoutInMs; @@ -829,11 +824,11 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) if (shouldDumpExternalTableLocation() && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { LOG.debug("Adding table {} to external tables list", tblName); - extTableLocations.addAll(writer.dataLocationDump(tableTuple.object)); + writer.dataLocationDump(tableTuple.object, extTableFileList, conf); } - managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot, + dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot, bootDumpBeginReplId, - hiveDb, tableTuple)); + hiveDb, tableTuple, managedTblList, dataCopyAtLoad); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. @@ -845,7 +840,6 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); - extTableCopyWorks = dirLocationsToCopy(extTableLocations); } catch (Exception e) { caught = e; } finally { @@ -873,12 +867,16 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); dmd.write(true); - - work.setDirCopyIterator(extTableCopyWorks.iterator()); - work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); + setDataCopyIterators(extTableFileList, managedTblList); return bootDumpBeginReplId; } + private FileList createTableFileList(Path dumpRoot, String fileName, int cacheSize) throws IOException { + Path backingFile = new Path(dumpRoot, fileName); + return work.getFileList(backingFile, cacheSize, conf, true); + } + + private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) { try { return dumpMetaData.getEventFrom() != null; @@ -921,9 +919,10 @@ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hive return dbRoot; } - List dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, Path dbRootData, long lastReplId, Hive hiveDb, - HiveWrapper.Tuple
tuple) throws Exception { + HiveWrapper.Tuple
tuple, FileList managedTbleList, boolean dataCopyAtLoad) + throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = @@ -941,16 +940,18 @@ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hive } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); tuple.replicationSpec.setRepl(true); - List managedTableCopyPaths = new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, - conf, mmCtx).write(false); + new TableExport(exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write( + false, managedTbleList, dataCopyAtLoad); work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.TABLES.name(), 1); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); + } + + private boolean dataCopyRequired(TableSpec tableSpec) { if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE) || Utils.shouldDumpMetaDataOnly(conf)) { - return Collections.emptyList(); + return false; } - return managedTableCopyPaths; + return true; } private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index bccaf9417b..718a611682 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -24,14 +24,17 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -53,12 +56,14 @@ private static boolean testInjectDumpDirAutoIncrement = false; static boolean testDeletePreviousDumpMetaPath = false; private Integer maxEventLimit; - private transient Iterator dirCopyIterator; - private transient Iterator managedTableCopyPathIterator; + private transient Iterator externalTblCopyPathIterator; + private transient Iterator managedTblCopyPathIterator; private Path currentDumpPath; private List resultValues; private boolean shouldOverwrite; private transient ReplicationMetricCollector metricCollector; + private ReplicationSpec replicationSpec; + private FileList mockedFileList; public static void injectNextDumpDirForTest(String dumpDir) { injectNextDumpDirForTest(dumpDir, false); @@ -130,22 +135,22 @@ void overrideLastEventToDump(Hive fromDb, long bootstrapLastId) throws Exception } } - public void setDirCopyIterator(Iterator dirCopyIterator) { - if (this.dirCopyIterator != null) { - throw new IllegalStateException("Dir Copy iterator has already been initialized"); + public void setExternalTblCopyPathIterator(Iterator externalTblCopyPathIterator) { + if (this.externalTblCopyPathIterator != null) { + throw new IllegalStateException("External table copy path iterator has already been initialized"); } - this.dirCopyIterator = dirCopyIterator; + this.externalTblCopyPathIterator = externalTblCopyPathIterator; } - public void setManagedTableCopyPathIterator(Iterator managedTableCopyPathIterator) { - if (this.managedTableCopyPathIterator != null) { + public void setManagedTableCopyPathIterator(Iterator managedTblCopyPathIterator) { + if (this.managedTblCopyPathIterator != null) { throw new IllegalStateException("Managed table copy path iterator has already been initialized"); } - this.managedTableCopyPathIterator = managedTableCopyPathIterator; + this.managedTblCopyPathIterator = managedTblCopyPathIterator; } public boolean tableDataCopyIteratorsInitialized() { - return dirCopyIterator != null || managedTableCopyPathIterator != null; + return externalTblCopyPathIterator != null || managedTblCopyPathIterator != null; } public Path getCurrentDumpPath() { @@ -169,8 +174,9 @@ public void setResultValues(List resultValues) { return Collections.emptyList(); } List> tasks = new ArrayList<>(); - while (dirCopyIterator.hasNext() && tracker.canAddMoreTasks()) { - DirCopyWork dirCopyWork = dirCopyIterator.next(); + while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { + DirCopyWork dirCopyWork = new DirCopyWork(); + dirCopyWork.loadFromString(externalTblCopyPathIterator.next()); Task task = TaskFactory.get(dirCopyWork, conf); tasks.add(task); tracker.addTask(task); @@ -184,8 +190,12 @@ public void setResultValues(List resultValues) { return Collections.emptyList(); } List> tasks = new ArrayList<>(); - while (managedTableCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { - EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next(); + while (managedTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { + ReplicationSpec replSpec = new ReplicationSpec(); + replSpec.setIsReplace(true); + replSpec.setInReplicationScope(true); + EximUtil.ManagedTableCopyPath managedTableCopyPath = new EximUtil.ManagedTableCopyPath(replSpec); + managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next()); Task copyTask = ReplCopyTask.getLoadCopyTask( managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite); @@ -207,4 +217,20 @@ public ReplicationMetricCollector getMetricCollector() { public void setMetricCollector(ReplicationMetricCollector metricCollector) { this.metricCollector = metricCollector; } + + public ReplicationSpec getReplicationSpec() { + return replicationSpec; + } + + public void setReplicationSpec(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } + + public FileList getFileList(Path backingFile, int cacheSize, HiveConf conf, boolean b) throws IOException { + return (mockedFileList == null) ? new FileList(backingFile, cacheSize, conf, true) : mockedFileList; + } + + public void setMockedFileList(FileList mockedFileList) { + this.mockedFileList = mockedFileList; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index a47a728a77..b6e085891d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -45,7 +46,6 @@ import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -135,10 +135,10 @@ private boolean shouldWrite() { * table if the table is partitioned and the partition location is outside the table. * It returns list of all the external table locations. */ - List dataLocationDump(Table table) throws InterruptedException, IOException, HiveException { - List extTableLocations = new LinkedList<>(); + void dataLocationDump(Table table, FileList fileList, HiveConf conf) + throws InterruptedException, IOException, HiveException { if (!shouldWrite()) { - return extTableLocations; + return; } if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { throw new IllegalArgumentException( @@ -148,7 +148,7 @@ private boolean shouldWrite() { Path fullyQualifiedDataLocation = PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); - extTableLocations.add(fullyQualifiedDataLocation); + dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf); if (table.isPartitioned()) { List partitions; try { @@ -157,7 +157,7 @@ private boolean shouldWrite() { if (e.getCause() instanceof NoSuchObjectException) { // If table is dropped when dump in progress, just skip partitions data location dump LOG.debug(e.getMessage()); - return extTableLocations; + return; } throw e; } @@ -170,11 +170,17 @@ private boolean shouldWrite() { fullyQualifiedDataLocation = PathBuilder .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); - extTableLocations.add(fullyQualifiedDataLocation); + dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf); } } } - return extTableLocations; + } + + private void dirLocationToCopy(FileList fileList, Path sourcePath, HiveConf conf) + throws HiveException { + Path basePath = getExternalTableBaseDir(conf); + Path targetPath = externalTableDataPath(conf, basePath, sourcePath); + fileList.add(new DirCopyWork(sourcePath, targetPath).convertToString()); } private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index a48cd5b3fc..e6aefae26f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -48,12 +48,14 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.HiveTableName; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; @@ -182,6 +184,7 @@ private int executeBootStrapLoad() throws Exception { Context loadContext = new Context(work.dumpDirectory, conf, getHive(), work.sessionStateLineageState, context); TaskTracker loadTaskTracker = new TaskTracker(maxTasks); + addLazyDataCopyTask(loadTaskTracker); /* for now for simplicity we are doing just one directory ( one database ), come back to use of multiple databases once we have the basic flow to chain creating of tasks in place for @@ -330,6 +333,20 @@ a database ( directory ) return 0; } + private void addLazyDataCopyTask(TaskTracker loadTaskTracker) { + boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + if (dataCopyAtLoad) { + if (work.getExternalTableDataCopyItr() == null) { + Path extTableBackingFile = new Path(work.dumpDirectory, EximUtil.FILES_NAME_EXTERNAL); + work.setExternalTableDataCopyItr(new FileList(extTableBackingFile, conf)); + } + if (childTasks == null) { + childTasks = new ArrayList<>(); + } + childTasks.addAll(work.externalTableCopyTasks(loadTaskTracker, conf)); + } + } + private TaskTracker addLoadPartitionTasks(Context loadContext, BootstrapEvent next, TaskTracker dbTracker, BootstrapEventsIterator iterator, Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker) throws Exception { @@ -545,6 +562,7 @@ private int executeIncrementalLoad() throws Exception { List> childTasks = new ArrayList<>(); int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); TaskTracker tracker = new TaskTracker(maxTasks); + addLazyDataCopyTask(tracker); childTasks.add(builder.build(context, getHive(), LOG, tracker)); // If there are no more events to be applied, add a task to update the last.repl.id of the // target database to the event id of the last event considered by the dump. Next diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 43bf365b4f..59fef1cebc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -22,26 +22,35 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.exec.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; @Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ReplLoadWork implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(ReplLoadWork.class); final String dbNameToLoadIn; final ReplScope currentReplScope; final String dumpDirectory; @@ -56,6 +65,7 @@ private final transient BootstrapEventsIterator bootstrapIterator; private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder; private transient Task rootTask; + private Iterator externalTableDataCopyItr; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -176,4 +186,28 @@ public ReplicationMetricCollector getMetricCollector() { public Long getDumpExecutionId() { return dumpExecutionId; } + + public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) { + return Collections.emptyList(); + } + List> tasks = new ArrayList<>(); + while (externalTableDataCopyItr.hasNext() && tracker.canAddMoreTasks()) { + DirCopyWork dirCopyWork = new DirCopyWork(); + dirCopyWork.loadFromString(externalTableDataCopyItr.next()); + Task task = TaskFactory.get(dirCopyWork, conf); + tasks.add(task); + tracker.addTask(task); + LOG.debug("Added task for {}", dirCopyWork); + } + return tasks; + } + + public Iterator getExternalTableDataCopyItr() { + return externalTableDataCopyItr; + } + + public void setExternalTableDataCopyItr(Iterator externalTableDataCopyItr) { + this.externalTableDataCopyItr = externalTableDataCopyItr; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 025457b73f..c4cfcf96f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -285,12 +285,12 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc : LoadFileType.OVERWRITE_EXISTING); stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); } - + boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)), stagingDir, - context.hiveConf, false, false + context.hiveConf, copyAtLoad, false ); Task movePartitionTask = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 9e236fd697..35ea777ef5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; @@ -56,7 +57,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.util.BitSet; import java.util.Collections; import java.util.HashSet; @@ -298,8 +298,9 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent + table.getCompleteName() + " with source location: " + dataPath.toString() + " and target location " + tgtPath.toString()); + boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, - false, false); + copyAtLoad, false); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java new file mode 100644 index 0000000000..695f41ceeb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * A file backed list of Strings which is in-memory till the threshold. + */ +public class FileList implements Closeable, Iterator { + private static final Logger LOG = LoggerFactory.getLogger(FileList.class); + private static int fileListStreamerID = 0; + private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; + + private LinkedBlockingQueue cache; + private volatile boolean thresholdHit = false; + private int thresholdPoint; + private float thresholdFactor = 0.9f; + private Path backingFile; + private FileListStreamer fileListStreamer; + private FileListOpMode fileListOpMode; + private String nextElement; + private boolean noMoreElement; + private HiveConf conf; + private BufferedReader backingFileReader; + private volatile boolean asyncMode; + + + /** + * To be used only for READ mode; + */ + public FileList(Path backingFile, HiveConf conf) { + this.backingFile = backingFile; + thresholdHit = true; + fileListOpMode = FileListOpMode.READ; + this.conf = conf; + } + + /** + * To be used only for WRITE mode; + */ + public FileList(Path backingFile, int cacheSize, HiveConf conf, boolean asyncMode) throws IOException { + this.cache = new LinkedBlockingQueue<>(cacheSize); + this.backingFile = backingFile; + fileListStreamer = new FileListStreamer(cache, backingFile, conf); + fileListOpMode = FileListOpMode.WRITE; + this.conf = conf; + thresholdPoint = getThreshold(cacheSize); + this.asyncMode = asyncMode; + } + + /** + * Only add operation is safe for concurrent operation. + */ + public void add(String entry) throws SemanticException { + validateMode(FileListOpMode.WRITE); + if (!asyncMode) { + fileListStreamer.writeInThread(entry); + return; + } + if (thresholdHit && !fileListStreamer.isValid()) { + throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + } + try { + cache.put(entry); + } catch (InterruptedException e) { + throw new SemanticException(e); + } + if (!thresholdHit && cache.size() > thresholdPoint) { + initStoreToFile(); + } + } + + /** + * Must be called before the list object can be used for read operation. + * @throws IOException + */ + @Override + public void close() throws IOException { + if (fileListOpMode == FileListOpMode.READ) { + if (backingFileReader != null) { + backingFileReader.close(); + } + } else { + fileListOpMode = FileListOpMode.CLOSING; + if (thresholdHit) { + fileListStreamer.close(); + } + fileListOpMode = FileListOpMode.READ; + } + } + + @Override + public boolean hasNext() { + validateMode(FileListOpMode.READ); + if (!thresholdHit) { + return !cache.isEmpty(); + } + if (nextElement != null) { + return true; + } + if (noMoreElement) { + return false; + } + nextElement = readNextLine(); + if (nextElement == null) { + noMoreElement = true; + } + return !noMoreElement; + } + + private String readNextLine() { + String nextElement = null; + try { + if (backingFileReader == null) { + FileSystem fs = FileSystem.get(backingFile.toUri(), conf); + if (fs.exists(backingFile)) { + backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile))); + } + } + nextElement = (backingFileReader == null) ? null : backingFileReader.readLine(); + } catch (IOException e) { + LOG.error("Unable to read list from backing file " + backingFile, e); + } + return nextElement; + } + + @Override + public String next() { + validateMode(FileListOpMode.READ); + if (!hasNext()) { + throw new NoSuchElementException("No more element in the list backed by " + backingFile); + } + String retVal = nextElement; + nextElement = null; + return thresholdHit ? retVal : cache.poll(); + } + private synchronized void initStoreToFile() { + if (!thresholdHit) { + fileListStreamer.setName(getNextID()); + fileListStreamer.setDaemon(true); + fileListStreamer.start(); + thresholdHit = true; + LOG.info("Started streaming the list elements to file: {}", backingFile); + } + } + + private static String getNextID() { + if (Integer.MAX_VALUE == fileListStreamerID) { + //reset the counter + fileListStreamerID = 0; + } + fileListStreamerID++; + return FILE_LIST_STREAMER_PREFIX + fileListStreamerID; + } + + private void validateMode(FileListOpMode expectedMode) throws IllegalStateException { + if (!fileListOpMode.equals(expectedMode)) { + String logMessage = String.format("Invalid mode for File List, expected:%s, found:%s", + expectedMode, fileListOpMode); + throw new IllegalStateException(logMessage); + } + } + + public int getThreshold(int cacheSize) { + boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor); + } + + private enum FileListOpMode { + READ, + WRITE, + CLOSING + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java new file mode 100644 index 0000000000..ffdd0e16da --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class FileListStreamer extends Thread implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class); + private static final long TIMEOUT_IN_SECS = 5L; + private volatile boolean stop; + private final LinkedBlockingQueue cache; + private Path backingFile; + private Configuration conf; + private BufferedWriter backingFileWriter; + private volatile boolean valid = true; + private volatile boolean asyncMode = false; + private final Object COMPLETION_LOCK = new Object(); + private volatile boolean completed = false; + + + + public FileListStreamer(LinkedBlockingQueue cache, Path backingFile, Configuration conf) throws IOException { + this.cache = cache; + this.backingFile = backingFile; + this.conf = conf; + init(); + } + + private void init() throws IOException { + FileSystem fs = FileSystem.get(backingFile.toUri(), conf); + backingFileWriter = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile, !asyncMode))); + LOG.info("Initialized a file based store to save a list at: {}, ayncMode:{}", backingFile, asyncMode); + } + + public boolean isValid() { + return valid; + } + + @Override + public void close() throws IOException { + if (!asyncMode) { + closeBackingFile(); + return; + } + stop = true; + synchronized (COMPLETION_LOCK) { + while (!completed && isValid()) { + try { + COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS)); + } catch (InterruptedException e) { + } + } + } + if (!isValid()) { + throw new IOException("File list is not in a valid state:" + backingFile); + } + LOG.info("Completed close for File List backed by ", backingFile); + } + + public synchronized void writeInThread(String nextEntry) throws SemanticException { + try { + backingFileWriter.write(nextEntry); + backingFileWriter.newLine(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + @Override + public void run() { + asyncMode = true; + boolean exThrown = false; + while (!exThrown && (!stop || !cache.isEmpty())) { + try { + String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS); + if (nextEntry != null) { + backingFileWriter.write(nextEntry); + backingFileWriter.newLine(); + LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile); + } + } catch (Exception iEx) { + if (!(iEx instanceof InterruptedException)) { + // not draining any more. Inform the producer to avoid OOM. + valid = false; + LOG.error("Exception while saving the list to file " + backingFile, iEx); + exThrown = true; + } + } + } + try{ + closeBackingFile(); + completed = true; + } finally { + synchronized (COMPLETION_LOCK) { + COMPLETION_LOCK.notify(); + } + } + LOG.info("Completed the file list streamer backed by: {}", backingFile); + } + + private void closeBackingFile() { + try { + backingFileWriter.close(); + LOG.debug("Closed the file list backing file: {}", backingFile); + } catch (IOException e) { + LOG.error("Exception while closing the file list backing file", e); + valid = false; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index bccf56af9e..ecf51a9f32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.repl.ReplConst; @@ -55,7 +54,6 @@ import org.apache.thrift.TException; import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java new file mode 100644 index 0000000000..8c8120ba6d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +public interface StringConvertibleObject { + + public String convertToString (); + + public void loadFromString (String objectInStr); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index fb6a38cd43..05c9c7aafd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.StringConvertibleObject; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -74,6 +75,7 @@ public static final String METADATA_NAME = "_metadata"; public static final String FILES_NAME = "_files"; + public static final String FILES_NAME_EXTERNAL = "_files_external"; public static final String DATA_PATH_NAME = "data"; public static final String METADATA_PATH_NAME = "metadata"; @@ -161,12 +163,17 @@ public void setOpenTxnTask(Task openTxnTask) { /** * Wrapper class for mapping source and target path for copying managed table data. */ - public static class ManagedTableCopyPath { + public static class ManagedTableCopyPath implements StringConvertibleObject { + private static final String URI_SEPARATOR = "#"; private ReplicationSpec replicationSpec; private static boolean nullSrcPathForTest = false; private Path srcPath; private Path tgtPath; + public ManagedTableCopyPath(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } + public ManagedTableCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) { this.replicationSpec = replicationSpec; if (srcPath == null) { @@ -215,6 +222,26 @@ public static void setNullSrcPath(HiveConf conf, boolean aNullSrcPath) { nullSrcPathForTest = aNullSrcPath; } } + + @Override + public String convertToString() { + StringBuilder objInStr = new StringBuilder(); + objInStr.append(srcPath) + .append(URI_SEPARATOR) + .append(tgtPath); + return objInStr.toString(); + } + + @Override + public void loadFromString(String objectInStr) { + String paths[] = objectInStr.split(URI_SEPARATOR); + this.srcPath = new Path(paths[0]); + this.tgtPath = new Path(paths[1]); + } + + private String getEmptyOrString(String str) { + return (str == null) ? "" : str; + } } private EximUtil() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 9cd9137ce6..55c84277fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -501,8 +501,9 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { + boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), - isSkipTrash, needRecycle, copyToMigratedTxnTable, false); + isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad); } else { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } @@ -648,8 +649,9 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { + boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath, - x.getConf(), isSkipTrash, needRecycle, copyToMigratedTxnTable, false); + x.getConf(), isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad); } else { copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 5c8d0edd77..8675ace7c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import javax.annotation.Nullable; -import java.text.Collator; import java.util.Map; /** @@ -54,6 +53,10 @@ private boolean isRepl = false; private boolean isMetadataOnlyForExternalTables = false; + public void setInReplicationScope(boolean inReplicationScope) { + isInReplicationScope = inReplicationScope; + } + // Key definitions related to replication. public enum KEY { REPL_SCOPE("repl.scope"), @@ -122,7 +125,7 @@ public ReplicationSpec(String fromId, String toId) { public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isReplace) { - this.isInReplicationScope = isInReplicationScope; + this.setInReplicationScope(isInReplicationScope); this.isMetadataOnly = isMetadataOnly; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; @@ -133,15 +136,15 @@ public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, public ReplicationSpec(Function keyFetcher) { String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString()); - this.isInReplicationScope = false; + this.setInReplicationScope(false); this.isMetadataOnly = false; this.specType = Type.DEFAULT; if (scope != null) { if (scope.equalsIgnoreCase("metadata")) { this.isMetadataOnly = true; - this.isInReplicationScope = true; + this.setInReplicationScope(true); } else if (scope.equalsIgnoreCase("all")) { - this.isInReplicationScope = true; + this.setInReplicationScope(true); } } this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString()); @@ -227,7 +230,7 @@ public boolean apply(@Nullable Partition partition) { private void init(ASTNode node){ // -> ^(TOK_REPLICATION $replId $isMetadataOnly) - isInReplicationScope = true; + setInReplicationScope(true); eventId = PlanUtils.stripQuotes(node.getChild(0).getText()); if ((node.getChildCount() > 1) && node.getChild(1).getText().toLowerCase().equals("metadata")) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 3de583276e..9462eadf0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; @@ -74,7 +75,8 @@ this.callersSession = SessionState.get(); } - List write(final ReplicationSpec forReplicationSpec, boolean isExportTask) + List write(final ReplicationSpec forReplicationSpec, boolean isExportTask, + FileList fileList, boolean dataCopyAtLoad) throws InterruptedException, HiveException { List> futures = new LinkedList<>(); List managedTableCopyPaths = new LinkedList<>(); @@ -114,16 +116,18 @@ String threadName = Thread.currentThread().getName(); LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName); try { - // Data Copy in case of ExportTask + // Data Copy in case of ExportTask or when dataCopyAtLoad is true List dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); - Path rootDataDumpDir = paths.partitionMetadataExportDir(partitionName); + Path rootDataDumpDir = paths.partitionDataExportDir(partitionName); new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) - .export(isExportTask); + .export(isExportTask, dataCopyAtLoad); Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); - return new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(), - dataDumpDir); + if (!(isExportTask || dataCopyAtLoad)) { + fileList.add(new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(), + dataDumpDir).convertToString()); + } } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } @@ -132,11 +136,7 @@ consumer.shutdown(); for (Future future : futures) { try { - Object retVal = future.get(); - if (retVal != null) { - ManagedTableCopyPath managedTableCopyPath = (ManagedTableCopyPath) retVal; - managedTableCopyPaths.add(managedTableCopyPath); - } + future.get(); } catch (Exception e) { LOG.error("failed", e.getCause()); throw new HiveException(e.getCause().getMessage(), e.getCause()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index b11afe80a1..66cf494dd6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -96,8 +97,7 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.mmCtx = mmCtx; } - public List write(boolean isExportTask) throws SemanticException { - List managedTableCopyPaths = Collections.emptyList(); + public void write(boolean isExportTask, FileList fileList, boolean dataCopyAtLoad) throws SemanticException { if (tableSpec == null) { writeMetaData(null); } else if (shouldExport()) { @@ -105,12 +105,11 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication writeMetaData(withPartitions); if (!replicationSpec.isMetadataOnly() && !(replicationSpec.isRepl() && tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE))) { - managedTableCopyPaths = writeData(withPartitions, isExportTask); + writeData(withPartitions, isExportTask, fileList, dataCopyAtLoad); } } else if (isExportTask) { throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg()); } - return managedTableCopyPaths; } private PartitionIterable getPartitions() throws SemanticException { @@ -161,26 +160,26 @@ private void writeMetaData(PartitionIterable partitions) throws SemanticExceptio } } - private List writeData(PartitionIterable partitions, boolean isExportTask) + private void writeData(PartitionIterable partitions, boolean isExportTask, FileList fileList, boolean dataCopyAtLoad) throws SemanticException { - List managedTableCopyPaths = new ArrayList<>(); try { if (tableSpec.tableHandle.isPartitioned()) { if (partitions == null) { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.getTableName().getTable()); } - managedTableCopyPaths = new PartitionExport( - paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec, isExportTask); + new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec, isExportTask, + fileList, dataCopyAtLoad); } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); - managedTableCopyPaths.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(), - paths.dataExportDir())); + if (!(isExportTask || dataCopyAtLoad)) { + fileList.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(), + paths.dataExportDir()).convertToString()); + } new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) - .export(isExportTask); + .export(isExportTask, (dataCopyAtLoad)); } - return managedTableCopyPaths; } catch (Exception e) { throw new SemanticException(e.getMessage(), e); } @@ -244,6 +243,10 @@ Path partitionMetadataExportDir(String partitionName) throws SemanticException { return exportDir(new Path(metadataExportRootDir(), partitionName)); } + Path partitionDataExportDir(String partitionName) throws SemanticException { + return exportDir(new Path(dataExportRootDir(), partitionName)); + } + /** * Access to the {@link #metadataExportRootDir} should only be done via this method * since the creation of the directory is delayed until we figure out if we want diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index 82bf384a74..e758c8da53 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -38,7 +38,9 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.BufferedWriter; import java.io.IOException; +import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; @@ -88,6 +90,23 @@ public long toEventId() { return event.getEventId(); } + private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { + Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); + FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); + return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + } + + protected void writeEncodedDumpFiles(Context withinContext, Iterable files, Path dataPath) + throws IOException { + // encoded filename/checksum of files, write into _files + try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { + for (String file : files) { + fileListWriter.write(file); + fileListWriter.newLine(); + } + } + } + protected void writeFileEntry(Table table, Partition ptn, String file, Context withinContext) throws IOException, LoginException, MetaException, HiveFatalException { HiveConf hiveConf = withinContext.hiveConf; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 6462e17515..3120c960e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import java.io.File; import java.util.Iterator; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -95,6 +96,7 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec, withinContext.hiveConf); + boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); Iterator partitionFilesIter = apm.getPartitionFilesIter().iterator(); // We expect one to one mapping between partitions and file iterators. For external table, this @@ -103,9 +105,15 @@ public void handle(Context withinContext) throws Exception { for (Partition qlPtn : qlPtns) { Iterable files = partitionFilesIter.next().getFiles(); if (files != null) { - // encoded filename/checksum of files, write into _files - for (String file : files) { - writeFileEntry(qlMdTable, qlPtn, file, withinContext); + if (copyAtLoad) { + // encoded filename/checksum of files, write into _files + Path ptnDataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator + + qlPtn.getName()); + writeEncodedDumpFiles(withinContext, files, ptnDataPath); + } else { + for (String file : files) { + writeFileEntry(qlMdTable, qlPtn, file, withinContext); + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 36369db69d..fcb919a3e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import javax.security.auth.login.LoginException; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -56,11 +57,17 @@ CommitTxnMessage eventMessage(String stringRepresentation) { return deserializer.getCommitTxnMessage(stringRepresentation); } - private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable files, Context withinContext) + private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable files, Context withinContext, + Path dataPath) throws IOException, LoginException, MetaException, HiveFatalException { - // encoded filename/checksum of files, write into _files - for (String file : files) { - writeFileEntry(qlMdTable, ptn, file, withinContext); + boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + if (copyAtLoad) { + // encoded filename/checksum of files, write into _files + writeEncodedDumpFiles(withinContext, files, dataPath); + } else { + for (String file : files) { + writeFileEntry(qlMdTable, ptn, file, withinContext); + } } } @@ -81,10 +88,13 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met withinContext.hiveConf); if ((null == qlPtns) || qlPtns.isEmpty()) { - writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext); + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext, dataPath); } else { for (int idx = 0; idx < qlPtns.size(); idx++) { - writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext); + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator + + qlPtns.get(idx).getName()); + writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext, dataPath); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index c732b21cdc..ed78bc03c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.ql.metadata.Table; @@ -26,10 +26,6 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; - class CreateTableHandler extends AbstractEventHandler { CreateTableHandler(NotificationEvent event) { @@ -79,23 +75,23 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec, withinContext.hiveConf); + boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); Iterable files = eventMessage.getFiles(); if (files != null) { - // encoded filename/checksum of files, write into _files - for (String file : files) { - writeFileEntry(qlMdTable, null, file, withinContext); + if (copyAtLoad) { + // encoded filename/checksum of files, write into _files + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + writeEncodedDumpFiles(withinContext, files, dataPath); + } else { + for (String file : files) { + writeFileEntry(qlMdTable, null, file, withinContext); + } } } withinContext.createDmd(this).write(); } - private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { - FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); - Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); - return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); - } - @Override public DumpType dumpType() { return DumpType.EVENT_CREATE_TABLE; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 09662dacf9..704b9960e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import java.io.File; import java.util.Collections; import java.util.List; @@ -76,6 +77,8 @@ public void handle(Context withinContext) throws Exception { withinContext.hiveConf); Iterable files = eventMessage.getFiles(); + boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY); + /* * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple tables. * But, Insert event is generated for each partition to which the data is inserted. @@ -83,9 +86,20 @@ public void handle(Context withinContext) throws Exception { */ Partition ptn = (null == qlPtns || qlPtns.isEmpty()) ? null : qlPtns.get(0); if (files != null) { - // encoded filename/checksum of files, write into _files - for (String file : files) { - writeFileEntry(qlMdTable, ptn, file, withinContext); + if (copyAtLoad) { + // encoded filename/checksum of files, write into _files + Path dataPath = null; + if ((null == qlPtns) || qlPtns.isEmpty()) { + dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + } else { + dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator + + qlPtns.get(0).getName()); + } + writeEncodedDumpFiles(withinContext, files, dataPath); + } else { + for (String file : files) { + writeFileEntry(qlMdTable, ptn, file, withinContext); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 5754bf2656..929d0c437a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; +import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStreamWriter; import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -28,14 +30,20 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; +import org.apache.hadoop.hive.shims.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,9 +76,11 @@ public FileOperations(List dataPathList, Path exportRootDataDir, String di exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); } - public void export(boolean isExportTask) throws Exception { + public void export(boolean isExportTask, boolean dataCopyAtLoad) throws Exception { if (isExportTask) { copyFiles(); + } else if (dataCopyAtLoad) { + exportFilesAsList(); } else { validateSrcPathListExists(); } @@ -165,4 +175,92 @@ private void validateSrcPathListExists() throws IOException, LoginException { throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); } } + + /** + * This needs the root data directory to which the data needs to be exported to. + * The data export here is a list of files either in table/partition that are written to the _files + * in the exportRootDataDir provided. + */ + private void exportFilesAsList() throws SemanticException, IOException, LoginException { + if (dataPathList.isEmpty()) { + return; + } + boolean done = false; + int repeat = 0; + while (!done) { + // This is only called for replication that handles MM tables; no need for mmCtx. + try (BufferedWriter writer = writer()) { + for (Path dataPath : dataPathList) { + writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } + done = true; + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); + } + repeat++; + logger.info("writeFilesList failed", e); + if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) { + logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); + } + + int sleepTime = FileUtils.getSleepTime(repeat - 1); + logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException timerEx) { + logger.info("thread sleep interrupted", timerEx.getMessage()); + } + + // in case of io error, reset the file system object + FileSystem.closeAllForUGI(Utils.getUGI()); + dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); + exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); + Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME); + if (exportFileSystem.exists(exportPath)) { + exportFileSystem.delete(exportPath, true); + } + } + } + } + + private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs) + throws IOException { + ReplChangeManager replChangeManager = ReplChangeManager.getInstance(); + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isDirectory()) { + // Write files inside the sub-directory. + Path subDir = fileStatus.getPath(); + writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir)); + } else { + writer.write(encodedUri(replChangeManager, fileStatus, encodedSubDirs)); + writer.newLine(); + } + } + } + + private BufferedWriter writer() throws IOException { + Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME); + logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile); + return new BufferedWriter( + new OutputStreamWriter(exportFileSystem.create(exportToFile)) + ); + } + + private String encodedSubDir(String encodedParentDirs, Path subDir) { + if (null == encodedParentDirs) { + return subDir.getName(); + } else { + return encodedParentDirs + Path.SEPARATOR + subDir.getName(); + } + } + + private String encodedUri(ReplChangeManager replChangeManager, FileStatus fileStatus, String encodedSubDir) + throws IOException { + Path currentDataFilePath = fileStatus.getPath(); + String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem); + return replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 8454b9c420..f90f1eb284 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -21,9 +21,9 @@ import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; @@ -121,8 +121,8 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw when(queryState.getConf()).thenReturn(conf); when(conf.getLong("hive.repl.last.repl.id", -1L)).thenReturn(1L); when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false); - when(HiveConf.getVar(conf, - HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h"); + when(HiveConf.getVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h"); + when(conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE)).thenReturn(10000); whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class)); whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class)); @@ -131,16 +131,15 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw private int tableDumpCount = 0; @Override - List dumpTable(String dbName, String tblName, String validTxnList, - Path dbRootMetadata, Path dbRootData, - long lastReplId, Hive hiveDb, - HiveWrapper.Tuple
tuple) + void dumpTable(String dbName, String tblName, String validTxnList, + Path dbRootMetadata, Path dbRootData, + long lastReplId, Hive hiveDb, + HiveWrapper.Tuple
tuple, FileList managedTableDirFileList, boolean dataCopyAtLoad) throws Exception { tableDumpCount++; if (tableDumpCount > 1) { throw new TestException(); } - return Collections.emptyList(); } }; @@ -148,6 +147,7 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw ReplDumpWork replDumpWork = new ReplDumpWork(replScope, null, "", ""); replDumpWork.setMetricCollector(metricCollector); + replDumpWork.setMockedFileList(mock(FileList.class)); task.setWork(replDumpWork); try { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 110b335896..3af74ba281 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -148,6 +148,13 @@ public static synchronized ReplChangeManager getInstance(Configuration conf) return instance; } + public static synchronized ReplChangeManager getInstance() { + if (!inited) { + throw new IllegalStateException("Replication Change Manager is not initialized."); + } + return instance; + } + private ReplChangeManager(Configuration conf) throws MetaException { try { if (!inited) {