diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java index efae01a812..8c25755fd3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.DirCopyWork; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.junit.Assert; @@ -531,9 +532,9 @@ public static void assertExternalFileInfo(WarehouseInstance warehouseInstance, BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); Set tableNames = new HashSet<>(); for (String line = reader.readLine(); line != null; line = reader.readLine()) { - String[] components = line.split(","); - Assert.assertEquals("The file should have tableName,base64encoded(data_location)", - 2, components.length); + String[] components = line.split(DirCopyWork.URI_SEPARATOR); + Assert.assertEquals("The file should have tableName#sourcelocation#targetloation", + 3, components.length); tableNames.add(components[0]); Assert.assertTrue(components[1].length() > 0); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 84699ce192..f68af22f37 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -69,7 +69,6 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.TARGET_OF_REPLICATION; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -2229,13 +2228,7 @@ private void assertExternalFileInfo(List expected, String dumplocation, WarehouseInstance warehouseInstance) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); - Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); - Path externalTableInfoFile; - if (isIncremental) { - externalTableInfoFile = new Path(hivePath, FILE_NAME); - } else { - externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); - } + Path externalTableInfoFile = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index 0dfea73ba9..d68c4db546 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -43,8 +43,6 @@ import java.util.Map; import java.util.Set; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; - /** * Test replication scenarios with staging on replica. */ @@ -548,13 +546,7 @@ private void assertExternalFileInfo(List expected, String dumplocation, WarehouseInstance warehouseInstance) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); - Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); - Path externalTableInfoFile; - if (isIncremental) { - externalTableInfoFile = new Path(hivePath, FILE_NAME); - } else { - externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); - } + Path externalTableInfoFile = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 2b04ba66db..0ecb40e257 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -72,7 +72,6 @@ import javax.annotation.Nullable; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -126,7 +125,7 @@ public void replicationWithoutExternalTables() throws Throwable { Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), EximUtil.METADATA_PATH_NAME); assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(metadataPath + relativeExtInfoPath(primaryDbName)))); + .exists(new Path(metadataPath, EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName, withClause) .run("repl status " + replicatedDbName) @@ -148,7 +147,7 @@ public void replicationWithoutExternalTables() throws Throwable { metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), EximUtil.METADATA_PATH_NAME); assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(metadataPath + relativeExtInfoPath(null)))); + .exists(new Path(metadataPath, EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) @@ -774,7 +773,7 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), EximUtil.METADATA_PATH_NAME); assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(metadataPath + relativeExtInfoPath(null)))); + .exists(new Path(metadataPath, EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -796,10 +795,10 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .run("create table t4 as select * from t3") .dump(primaryDbName, dumpWithClause); - // the _external_tables_file info should be created as external tables are to be replicated. + // the _file_list_external info should be created as external tables are to be replicated. Path hivePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(hivePath + relativeExtInfoPath(null)))); + .exists(new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL))); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t2", "t3"), tuple.dumpLocation, true); @@ -1314,21 +1313,7 @@ private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); - Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); - Path externalTableInfoFile; - if (isIncremental) { - externalTableInfoFile = new Path(hivePath + relativeExtInfoPath(dbName)); - } else { - externalTableInfoFile = new Path(metadataPath + relativeExtInfoPath(dbName)); - } + Path externalTableInfoFile = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile); } - - private String relativeExtInfoPath(String dbName) { - if (dbName == null) { - return File.separator + FILE_NAME; - } else { - return File.separator + dbName.toLowerCase() + File.separator + FILE_NAME; - } - } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java index 357302c809..2cd7482385 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java @@ -48,7 +48,6 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_HIVE_BASE_DIR; import static org.junit.Assert.assertFalse; @@ -105,7 +104,7 @@ public void replicationWithoutExternalTables() throws Throwable { // the _external_tables_file info only should be created if external tables are to be replicated not otherwise assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("repl status " + replicatedDbName) @@ -125,7 +124,7 @@ public void replicationWithoutExternalTables() throws Throwable { // the _external_tables_file data only should be created if external tables are to be replicated not otherwise assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, FILE_NAME))); + .exists(new Path(tuple.dumpLocation, EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -149,7 +148,7 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .dumpWithCommand("repl dump " + primaryDbName); // verify that the external table info is not written as metadata only replication - assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), EximUtil.FILE_LIST_EXTERNAL)); List withClauseOptions = ReplicationTestUtils.includeExternalTableClause(true); @@ -178,7 +177,7 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .dumpWithCommand("repl dump " + primaryDbName); // verify that the external table info is written correctly for incremental - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -194,7 +193,7 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .dumpWithCommand("repl dump " + primaryDbName); // verify that the external table info is written correctly for incremental - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, EximUtil.FILE_LIST_EXTERNAL)); } @Test @@ -267,7 +266,7 @@ public void externalTableWithPartitions() throws Throwable { .run("insert into t2 partition(country='india') values ('bangalore')") .dumpWithCommand("repl dump " + primaryDbName); - assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -290,7 +289,7 @@ public void externalTableWithPartitions() throws Throwable { .run("insert into t2 partition(country='australia') values ('sydney')") .dump(primaryDbName); - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -372,7 +371,7 @@ public void externalTableIncrementalReplication() throws Throwable { .run("alter table t1 add partition(country='us')") .dump(primaryDbName); - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, EximUtil.FILE_LIST_EXTERNAL)); // Add new data externally, to a partition, but under the partition level top directory // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. @@ -406,7 +405,7 @@ public void externalTableIncrementalReplication() throws Throwable { // Repl load with zero events but external tables location info should present. tuple = primary.dump(primaryDbName); - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + assertFalseExternalFileInfo(new Path(tuple.dumpLocation, EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -457,7 +456,7 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { // the _external_tables_file info only should be created if external tables are to be replicated not otherwise assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .status(replicatedDbName) @@ -485,11 +484,11 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { String hiveDumpDir = tuple.dumpLocation + File.separator + REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(hiveDumpDir, FILE_NAME))); + .exists(new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL))); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t2", "t3"), - new Path(hiveDumpDir, FILE_NAME)); + new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL)); // _bootstrap directory should be created as bootstrap enabled on external tables. @@ -584,7 +583,7 @@ public Table apply(@Nullable Table table) { // Only table t2 should exist in the data location list file. String hiveDumpDir = tupleInc.dumpLocation + File.separator + REPL_HIVE_BASE_DIR; - assertFalseExternalFileInfo(new Path(hiveDumpDir, FILE_NAME)); + assertFalseExternalFileInfo(new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL)); // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have // inserted data. diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index c4f2b5fe68..8f5ab53520 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -48,7 +48,6 @@ import java.io.InputStreamReader; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -512,11 +511,11 @@ public void testBootstrapExternalTablesWithIncludeAndExcludeList() throws Throwa Path metaDataPath = new Path(hiveDumpDir, EximUtil.METADATA_PATH_NAME); // the _external_tables_file info should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME))); + .exists(new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL))); // Verify that the external table info contains only table "a2". ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"), - new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME)); + new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, replPolicy, loadWithClause) .run("use " + replicatedDbName) @@ -552,11 +551,11 @@ public void testBootstrapExternalTablesIncrementalPhaseWithIncludeAndExcludeList String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(hiveDumpDir, FILE_NAME))); + .exists(new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL))); // Verify that the external table info contains only table "a2". ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"), - new Path(hiveDumpDir, FILE_NAME)); + new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, replPolicy, loadWithClause) .run("use " + replicatedDbName) @@ -701,11 +700,11 @@ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(hiveDumpDir, FILE_NAME))); + .exists(new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL))); // Verify that the external table info contains table "a2" and "c2". ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2", "c2"), - new Path(hiveDumpDir, FILE_NAME)); + new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL)); // Verify if the expected tables are bootstrapped. verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java index c232dfa0b0..e247d5a66b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java @@ -30,8 +30,9 @@ Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class DirCopyWork implements Serializable, StringConvertibleObject { - private static final String URI_SEPARATOR = "#"; + public static final String URI_SEPARATOR = "#"; private static final long serialVersionUID = 1L; + private String tableName; private Path fullyQualifiedSourcePath; private Path fullyQualifiedTargetPath; private String dumpDirectory; @@ -42,7 +43,8 @@ public DirCopyWork(ReplicationMetricCollector metricCollector, String dumpDirect this.dumpDirectory = dumpDirectory; } - public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) { + public DirCopyWork(String tableName, Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) { + this.tableName = tableName; this.fullyQualifiedSourcePath = fullyQualifiedSourcePath; this.fullyQualifiedTargetPath = fullyQualifiedTargetPath; } @@ -73,7 +75,9 @@ public String getDumpDirectory() { @Override public String convertToString() { StringBuilder objInStr = new StringBuilder(); - objInStr.append(fullyQualifiedSourcePath) + objInStr.append(tableName) + .append(URI_SEPARATOR) + .append(fullyQualifiedSourcePath) .append(URI_SEPARATOR) .append(fullyQualifiedTargetPath); return objInStr.toString(); @@ -82,7 +86,7 @@ public String convertToString() { @Override public void loadFromString(String objectInStr) { String paths[] = objectInStr.split(URI_SEPARATOR); - this.fullyQualifiedSourcePath = new Path(paths[0]); - this.fullyQualifiedTargetPath = new Path(paths[1]); + this.fullyQualifiedSourcePath = new Path(paths[1]); + this.fullyQualifiedTargetPath = new Path(paths[2]); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 8c2bd0543b..fa84d73c77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.exec.util.Retryable; @@ -115,7 +116,6 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.metastore.ReplChangeManager.getReplPolicyIdString; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER; @@ -624,9 +624,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive dmd.setReplScope(work.replScope); dmd.write(true); } - int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); - try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); - FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { + try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST); + FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL)) { // Examine all the tables if required. if (shouldExamineTablesToDump() || (tableList != null)) { // If required wait more for any transactions open at the time of starting the ACID bootstrap. @@ -648,35 +647,34 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive Path dbRootMetadata = new Path(metadataPath, dbName); Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName); boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); - try (Writer writer = new Writer(dumpRoot, conf)) { - for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { - try { - Table table = hiveDb.getTable(dbName, tableName); + ReplExternalTables externalTablesWriter = new ReplExternalTables(conf); + for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { + try { + Table table = hiveDb.getTable(dbName, tableName); - // Dump external table locations if required. - if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) - && shouldDumpExternalTableLocation()) { - writer.dataLocationDump(table, extTableFileList, conf); - } + // Dump external table locations if required. + if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) + && shouldDumpExternalTableLocation()) { + externalTablesWriter.dataLocationDump(table, extTableFileList, conf); + } - // Dump the table to be bootstrapped if required. - if (shouldBootstrapDumpTable(table)) { - HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, - hiveDb, tableTuple, managedTblList, dataCopyAtLoad); - } - if (tableList != null && isTableSatifiesConfig(table)) { - tableList.add(tableName); - } - } catch (InvalidTableException te) { - // Repl dump shouldn't fail if the table is dropped/renamed while dumping it. - // Just log a debug message and skip it. - LOG.debug(te.getMessage()); + // Dump the table to be bootstrapped if required. + if (shouldBootstrapDumpTable(table)) { + HiveWrapper.Tuple
tableTuple = new HiveWrapper(hiveDb, dbName).table(table); + dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, + hiveDb, tableTuple, managedTblList, dataCopyAtLoad); } + if (tableList != null && isTableSatifiesConfig(table)) { + tableList.add(tableName); + } + } catch (InvalidTableException te) { + // Repl dump shouldn't fail if the table is dropped/renamed while dumping it. + // Just log a debug message and skip it. + LOG.debug(te.getMessage()); } } - dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); } + dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); setDataCopyIterators(extTableFileList, managedTblList); work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId); return lastReplId; @@ -864,8 +862,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) } List functionsBinaryCopyPaths = Collections.emptyList(); int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); - try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); - FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { + try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST); + FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL)) { for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("Dumping db: " + dbName); // TODO : Currently we don't support separate table list for each database. @@ -915,7 +913,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; - try (Writer writer = new Writer(dbRoot, conf)) { + try { + ReplExternalTables externalTablesWriter = new ReplExternalTables(conf); for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { Table table = null; try { @@ -933,7 +932,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) if (shouldDumpExternalTableLocation() && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { LOG.debug("Adding table {} to external tables list", tblName); - writer.dataLocationDump(tableTuple.object, extTableFileList, conf); + externalTablesWriter.dataLocationDump(tableTuple.object, extTableFileList, conf); } dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot, bootDumpBeginReplId, @@ -998,9 +997,9 @@ private void updateReplSourceFor(Hive hiveDb, String dbName, Database db, hiveDb.alterDatabase(dbName, db); } - private FileList createTableFileList(Path dumpRoot, String fileName, int cacheSize) { + private FileList createTableFileList(Path dumpRoot, String fileName) { Path backingFile = new Path(dumpRoot, fileName); - return new FileList(backingFile, cacheSize, conf); + return new FileList(backingFile, conf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index 1f4e20de98..6c77f6ba05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.exec.repl; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; @@ -28,7 +27,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -39,35 +37,93 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.Closeable; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.StringWriter; import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.Base64; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -/** - * Format of the file used to dump information about external tables: - *

- * table_name1,[base64Encoded(table_dir_location)]\n - * - * The file generated here is explicitly used for data copy of external tables and hence handling of - * writing this file is different than regular event handling for replication based on the conditions - * specified in {@link org.apache.hadoop.hive.ql.parse.repl.dump.Utils#shouldReplicate} - */ -public final class ReplExternalTables { +public class ReplExternalTables { private static final Logger LOG = LoggerFactory.getLogger(ReplExternalTables.class); - private static final String FIELD_SEPARATOR = ","; - public static final String FILE_NAME = "_external_tables_info"; + private HiveConf hiveConf; + private boolean includeExternalTables; + private boolean dumpMetadataOnly; + + public ReplExternalTables(HiveConf conf) { + this.hiveConf = conf; + this.includeExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); + this.dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || + hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE); + } + + private boolean shouldWrite() { + return !dumpMetadataOnly && includeExternalTables; + } - private ReplExternalTables(){} + /** + * this will dump a single line per external table. it can include additional lines for the same + * table if the table is partitioned and the partition location is outside the table. + * It returns list of all the external table locations. + */ + void dataLocationDump(Table table, FileList fileList, HiveConf conf) + throws IOException, HiveException { + if (!shouldWrite()) { + return; + } + if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + throw new IllegalArgumentException( + "only External tables can be writen via this writer, provided table is " + table + .getTableType()); + } + Path fullyQualifiedDataLocation = + PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf)); + dirLocationToCopy(table.getTableName(), fileList, fullyQualifiedDataLocation, conf); + if (table.isPartitioned()) { + List partitions; + try { + partitions = Hive.get(hiveConf).getPartitions(table); + } catch (HiveException e) { + if (e.getCause() instanceof NoSuchObjectException) { + // If table is dropped when dump in progress, just skip partitions data location dump + LOG.debug(e.getMessage()); + return; + } + throw e; + } + + for (Partition partition : partitions) { + boolean partitionLocOutsideTableLoc = !FileUtils.isPathWithinSubtree( + partition.getDataLocation(), table.getDataLocation() + ); + if (partitionLocOutsideTableLoc) { + fullyQualifiedDataLocation = PathBuilder + .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf)); + dirLocationToCopy(table.getTableName(), fileList, fullyQualifiedDataLocation, conf); + } + } + } + } + + private void dirLocationToCopy(String tableName, FileList fileList, Path sourcePath, HiveConf conf) + throws HiveException { + Path basePath = getExternalTableBaseDir(conf); + Path targetPath = externalTableDataPath(conf, basePath, sourcePath); + //Here, when src and target are HA clusters with same NS, then sourcePath would have the correct host + //whereas the targetPath would have an host that refers to the target cluster. This is fine for + //data-copy running during dump as the correct logical locations would be used. But if data-copy runs during + //load, then the remote location needs to point to the src cluster from where the data would be copied and + //the common original NS would suffice for targetPath. + if(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE) && + hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET)) { + String remoteNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname); + if (StringUtils.isEmpty(remoteNS)) { + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE + .format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid " + + remoteNS == null ? "null" : remoteNS, ReplUtils.REPL_HIVE_SERVICE)); + } + targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost())); + sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS)); + } + fileList.add(new DirCopyWork(tableName, sourcePath, targetPath).convertToString()); + } public static String externalTableLocation(HiveConf hiveConf, String location) throws SemanticException { Path basePath = getExternalTableBaseDir(hiveConf); @@ -110,234 +166,4 @@ public static Path externalTableDataPath(HiveConf hiveConf, Path basePath, Path } return dataPath; } - - public static class Writer implements Closeable { - private static Logger LOG = LoggerFactory.getLogger(Writer.class); - private final HiveConf hiveConf; - private final Path writePath; - private final boolean includeExternalTables; - private final boolean dumpMetadataOnly; - private OutputStream writer; - - Writer(Path dbRoot, HiveConf hiveConf) throws IOException { - this.hiveConf = hiveConf; - writePath = new Path(dbRoot, FILE_NAME); - includeExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); - dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || - hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE); - if (shouldWrite()) { - this.writer = writePath.getFileSystem(hiveConf).create(writePath); - } - } - - private boolean shouldWrite() { - return !dumpMetadataOnly && includeExternalTables; - } - - /** - * this will dump a single line per external table. it can include additional lines for the same - * table if the table is partitioned and the partition location is outside the table. - * It returns list of all the external table locations. - */ - void dataLocationDump(Table table, FileList fileList, HiveConf conf) - throws InterruptedException, IOException, HiveException { - if (!shouldWrite()) { - return; - } - if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - throw new IllegalArgumentException( - "only External tables can be writen via this writer, provided table is " + table - .getTableType()); - } - Path fullyQualifiedDataLocation = - PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf)); - write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); - dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf); - if (table.isPartitioned()) { - List partitions; - try { - partitions = Hive.get(hiveConf).getPartitions(table); - } catch (HiveException e) { - if (e.getCause() instanceof NoSuchObjectException) { - // If table is dropped when dump in progress, just skip partitions data location dump - LOG.debug(e.getMessage()); - return; - } - throw e; - } - - for (Partition partition : partitions) { - boolean partitionLocOutsideTableLoc = !FileUtils.isPathWithinSubtree( - partition.getDataLocation(), table.getDataLocation() - ); - if (partitionLocOutsideTableLoc) { - fullyQualifiedDataLocation = PathBuilder - .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf)); - write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); - dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf); - } - } - } - } - - private void dirLocationToCopy(FileList fileList, Path sourcePath, HiveConf conf) - throws HiveException { - Path basePath = getExternalTableBaseDir(conf); - Path targetPath = externalTableDataPath(conf, basePath, sourcePath); - //Here, when src and target are HA clusters with same NS, then sourcePath would have the correct host - //whereas the targetPath would have an host that refers to the target cluster. This is fine for - //data-copy running during dump as the correct logical locations would be used. But if data-copy runs during - //load, then the remote location needs to point to the src cluster from where the data would be copied and - //the common original NS would suffice for targetPath. - if(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE) && - hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET)) { - String remoteNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname); - if (StringUtils.isEmpty(remoteNS)) { - throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE - .format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid " - + remoteNS == null ? "null" : remoteNS, ReplUtils.REPL_HIVE_SERVICE)); - } - targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost())); - sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS)); - } - fileList.add(new DirCopyWork(sourcePath, targetPath).convertToString()); - } - - private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf) - throws IOException, SemanticException { - StringWriter lineToWrite = new StringWriter(); - lineToWrite.append(tableName).append(FIELD_SEPARATOR); - Path dataLocation = - PathBuilder.fullyQualifiedHDFSUri(dataLoc, dataLoc.getFileSystem(hiveConf)); - byte[] encodedBytes = Base64.getEncoder() - .encode(dataLocation.toString().getBytes(StandardCharsets.UTF_8)); - String encodedPath = new String(encodedBytes, StandardCharsets.UTF_8); - lineToWrite.append(encodedPath).append("\n"); - return lineToWrite.toString(); - } - - private void write(String line) throws SemanticException { - Retryable retryable = Retryable.builder() - .withHiveConf(hiveConf) - .withRetryOnException(IOException.class).build(); - try { - retryable.executeCallable((Callable) () -> { - try { - writer.write(line.getBytes(StandardCharsets.UTF_8)); - } catch (IOException e) { - writer = openWriterAppendMode(); - throw e; - } - return null; - }); - } catch (Exception e) { - throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); - } - } - - private OutputStream openWriterAppendMode() { - try { - // not sure if the exception was due to a incorrect state within the writer hence closing it - close(); - return FileSystem.get(hiveConf).append(writePath); - } catch (IOException e1) { - String message = "there was an error to open the file {} in append mode"; - LOG.error(message, writePath.toString(), e1); - throw new IllegalStateException(message, e1); - } - } - - @Override - public void close() throws IOException { - if (writer != null) { - writer.close(); - } - } - } - - public static class Reader { - private static Logger LOG = LoggerFactory.getLogger(Reader.class); - private final HiveConf hiveConf; - private final Path rootReplLoadPath; - private final boolean isIncrementalPhase; - - public Reader(HiveConf conf, Path rootReplLoadPath, boolean isIncrementalPhase) { - this.hiveConf = conf; - this.rootReplLoadPath = rootReplLoadPath; - this.isIncrementalPhase = isIncrementalPhase; - } - - /** - * currently we only support dump/load of single db and the db Dump location cannot be inferred from - * the incoming dbNameOfPattern value since the load db name can be different from the target db Name - * hence traverse 1 level down from rootReplLoadPath to look for the file providing the hdfs locations. - */ - public Set sourceLocationsToCopy() throws IOException { - if (isIncrementalPhase) { - return sourceLocationsToCopy(new Path(rootReplLoadPath, FILE_NAME)); - } - - // this is bootstrap load path - Set locationsToCopy = new HashSet<>(); - FileSystem fileSystem = rootReplLoadPath.getFileSystem(hiveConf); - FileStatus[] fileStatuses = fileSystem.listStatus(rootReplLoadPath); - for (FileStatus next : fileStatuses) { - if (next.isDirectory()) { - Path externalTableInfoPath = new Path(next.getPath(), FILE_NAME); - locationsToCopy.addAll(sourceLocationsToCopy(externalTableInfoPath)); - } - } - return locationsToCopy; - } - - private BufferedReader reader(FileSystem fs, Path externalTableInfo) throws IOException { - InputStreamReader in = new InputStreamReader(fs.open(externalTableInfo), StandardCharsets.UTF_8); - return new BufferedReader(in); - } - - /** - * The SET of source locations should never be created based on the table Name in - * {@link #FILE_NAME} since there can be multiple entries for the same table in case the table is - * partitioned and the partitions are added by providing a separate Location for that partition, - * different than the table location. - */ - private Set sourceLocationsToCopy(Path externalTableInfo) throws IOException { - Set locationsToCopy = new HashSet<>(); - FileSystem fileSystem = externalTableInfo.getFileSystem(hiveConf); - if (!fileSystem.exists(externalTableInfo)) { - return locationsToCopy; - } - Retryable retryable = Retryable.builder() - .withHiveConf(hiveConf) - .withRetryOnException(IOException.class).build(); - try { - return retryable.executeCallable(() -> { - BufferedReader reader = null; - try { - reader = reader(fileSystem, externalTableInfo); - for (String line = reader.readLine(); line != null; line = reader.readLine()) { - String[] splits = line.split(FIELD_SEPARATOR); - locationsToCopy - .add(new String(Base64.getDecoder().decode(splits[1]), StandardCharsets.UTF_8)); - } - return locationsToCopy; - } finally { - closeQuietly(reader); - } - }); - } catch (Exception e) { - throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); - } - } - - private static void closeQuietly(BufferedReader reader) { - try { - if (reader != null) { - reader.close(); - } - } catch (IOException e) { - LOG.debug("error while closing reader ", e); - } - } - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 2f416731ef..225d5af9a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -353,7 +353,7 @@ private void addLazyDataCopyTask(TaskTracker loadTaskTracker) throws IOException if (dataCopyAtLoad) { if (work.getExternalTableDataCopyItr() == null) { Path extTableBackingFile = new Path(work.dumpDirectory, EximUtil.FILE_LIST_EXTERNAL); - try(FileList fileList = new FileList(extTableBackingFile, 0, conf)) { + try(FileList fileList = new FileList(extTableBackingFile, conf)) { work.setExternalTableDataCopyItr(fileList); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index 4cf316a42c..363ba1fbd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -124,7 +124,7 @@ public boolean hasNext() { LocatedFileStatus next = remoteIterator.next(); // we want to skip this file, this also means there cant be a table with name represented // by constant ReplExternalTables.FILE_NAME or ReplUtils.REPL_TABLE_LIST_DIR_NAME (_tables) - if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME) || + if(next.getPath().toString().endsWith(EximUtil.FILE_LIST_EXTERNAL) || next.getPath().toString().endsWith(ReplUtils.REPL_TABLE_LIST_DIR_NAME)) { continue; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java index 50c621b782..209c33b987 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java @@ -22,154 +22,118 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private String nextElement = null; private HiveConf conf; private BufferedReader backingFileReader; + private BufferedWriter backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; - if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); - } else { - thresholdHit = true; - } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { - this.backingFile = backingFile; - this.fileListStreamer = fileListStreamer; - this.cache = cache; - this.conf = conf; - thresholdPoint = getThreshold(cache.remainingCapacity()); - } - - /** - * Only add operation is safe for concurrent operations. - */ public void add(String entry) throws SemanticException { - if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); - } + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); try { - cache.put(entry); - } catch (InterruptedException e) { - throw new SemanticException(e); - } - if (!thresholdHit && cache.size() >= thresholdPoint) { - initStoreToFile(cache.size()); + retryable.executeCallable((Callable) ()-> { + synchronized (backingFile ) { + if (backingFileWriter == null) { + backingFileWriter = initWriter(); + } + backingFileWriter.write(entry); + backingFileWriter.newLine(); + } + LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); + return null; + }); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage(), + String.valueOf(ErrorMsg.getErrorMsg(e).getErrorCode()))); } } + BufferedWriter initWriter() throws IOException { + FileSystem fs = FileSystem.get(backingFile.toUri(), conf); + return new BufferedWriter(new OutputStreamWriter(fs.create(backingFile))); + } + @Override public boolean hasNext() { - if (!thresholdHit) { - return (cache != null && !cache.isEmpty()); - } if (nextElement != null) { return true; + } else { + try { + nextElement = readNextLine(); + return (nextElement != null); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - if (noMoreElement) { - return false; - } - nextElement = readNextLine(); - if (nextElement == null) { - noMoreElement = true; - } - return !noMoreElement; } @Override public String next() { - if (!hasNext()) { + if (nextElement == null && !hasNext()) { throw new NoSuchElementException("No more element in the list backed by " + backingFile); } String retVal = nextElement; nextElement = null; - return thresholdHit ? retVal : cache.poll(); - } - - private synchronized void initStoreToFile(int cacheSize) { - if (!thresholdHit) { - fileListStreamer.setName(getNextID()); - fileListStreamer.setDaemon(true); - fileListStreamer.start(); - thresholdHit = true; - LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize); - } + return retVal; } - private String readNextLine() { - String nextElement = null; + private String readNextLine() throws IOException { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); try { - if (backingFileReader == null) { - FileSystem fs = FileSystem.get(backingFile.toUri(), conf); - if (fs.exists(backingFile)) { + return retryable.executeCallable(() -> { + String nextElement; + if (backingFileReader == null) { + FileSystem fs = FileSystem.get(backingFile.toUri(), conf); + if(!fs.exists(backingFile)) { + return null; + } backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile))); } - } - nextElement = (backingFileReader == null) ? null : backingFileReader.readLine(); - } catch (IOException e) { - LOG.error("Unable to read list from backing file " + backingFile, e); + nextElement = backingFileReader.readLine(); + return nextElement; + }); + } catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage(), + String.valueOf(ErrorMsg.getErrorMsg(e).getErrorCode()))); } - return nextElement; } @Override public void close() throws IOException { - if (thresholdHit && fileListStreamer != null) { - fileListStreamer.close(); - } if (backingFileReader != null) { backingFileReader.close(); } - LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit); - } - - private static String getNextID() { - if (Integer.MAX_VALUE == fileListStreamerID) { - //reset the counter - fileListStreamerID = 0; + if (backingFileWriter != null) { + backingFileWriter.close(); } - fileListStreamerID++; - return FILE_LIST_STREAMER_PREFIX + fileListStreamerID; - } - - public int getThreshold(int cacheSize) { - boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); - return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor); + LOG.info("Completed close for File List backed by:{}", backingFile); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java deleted file mode 100644 index 36fd0555f8..0000000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.repl.util; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedWriter; -import java.io.Closeable; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Streams the entries from a cache to the backed file. - */ - -public class FileListStreamer extends Thread implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class); - private static final long TIMEOUT_IN_SECS = 5L; - private volatile boolean signalTostop; - private LinkedBlockingQueue cache; - private Path backingFile; - private Configuration conf; - private BufferedWriter backingFileWriter; - private volatile boolean valid = true; - private final Object COMPLETION_LOCK = new Object(); - private volatile boolean completed = false; - - public FileListStreamer(LinkedBlockingQueue cache, Path backingFile, Configuration conf) { - this.cache = cache; - this.backingFile = backingFile; - this.conf = conf; - } - - BufferedWriter lazyInitWriter() throws IOException { - FileSystem fs = FileSystem.get(backingFile.toUri(), conf); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile))); - LOG.info("Initialized a file based store to save a list at: {}", backingFile); - return writer; - } - - public boolean isValid() { - return valid; - } - - // Blocks for remaining entries to be flushed to file. - @Override - public void close() throws IOException { - signalTostop = true; - synchronized (COMPLETION_LOCK) { - while (motiveToWait()) { - try { - COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS)); - } catch (InterruptedException e) { - // no-op - } - } - } - if (!isValid()) { - throw new IOException("File list is not in a valid state:" + backingFile); - } - } - - private boolean motiveToWait() { - return !completed && valid; - } - - @Override - public void run() { - try { - backingFileWriter = lazyInitWriter(); - } catch (IOException e) { - valid = false; - throw new RuntimeException("Unable to initialize the file list streamer", e); - } - boolean exThrown = false; - while (!exThrown && (!signalTostop || !cache.isEmpty())) { - try { - String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS); - if (nextEntry != null) { - backingFileWriter.write(nextEntry); - backingFileWriter.newLine(); - LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile); - } - } catch (Exception iEx) { - if (!(iEx instanceof InterruptedException)) { - // not draining any more. Inform the producer to avoid OOM. - valid = false; - LOG.error("Exception while saving the list to file " + backingFile, iEx); - exThrown = true; - } - } - } - try{ - closeBackingFile(); - completed = true; - } finally { - synchronized (COMPLETION_LOCK) { - COMPLETION_LOCK.notify(); - } - } - LOG.info("Completed the file list streamer backed by: {}", backingFile); - } - - private void closeBackingFile() { - try { - backingFileWriter.close(); - LOG.debug("Closed the file list backing file: {}", backingFile); - } catch (IOException e) { - LOG.error("Exception while closing the file list backing file", e); - valid = false; - } - } - - @VisibleForTesting - boolean isInitialized() { - return backingFileWriter != null; - } -} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 6681102366..068002fb25 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -50,7 +50,6 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.when; import static org.powermock.api.mockito.PowerMockito.whenNew; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; @RunWith(PowerMockRunner.class) @PrepareForTest({ Utils.class, ReplDumpTask.class}) @@ -127,8 +126,6 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false); when(HiveConf.getVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h"); when(conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE)).thenReturn(10000); - - whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class)); whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class)); ReplDumpTask task = new StubReplDumpTask() { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java index 67000a0cb3..948fe20236 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedWriter; +import java.io.File; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -51,52 +52,9 @@ @Mock private BufferedWriter bufferedWriter; - - @Test - public void testNoStreaming() throws Exception { - Object tuple[] = setupAndGetTuple(100, false); - FileList fileList = (FileList) tuple[0]; - FileListStreamer fileListStreamer = (FileListStreamer) tuple[1]; - fileList.add("Entry1"); - fileList.add("Entry2"); - assertFalse(isStreamingToFile(fileListStreamer)); - } - - @Test - public void testAlwaysStreaming() throws Exception { - Object tuple[] = setupAndGetTuple(100, true); - FileList fileList = (FileList) tuple[0]; - FileListStreamer fileListStreamer = (FileListStreamer) tuple[1]; - assertFalse(fileListStreamer.isInitialized()); - fileList.add("Entry1"); - waitForStreamingInitialization(fileListStreamer); - assertTrue(isStreamingToFile(fileListStreamer)); - fileList.close(); - waitForStreamingClosure(fileListStreamer); - } - - @Test - public void testStreaminOnCacheHit() throws Exception { - Object tuple[] = setupAndGetTuple(5, false); - FileList fileList = (FileList) tuple[0]; - FileListStreamer fileListStreamer = (FileListStreamer) tuple[1]; - fileList.add("Entry1"); - fileList.add("Entry2"); - fileList.add("Entry3"); - Thread.sleep(5000L); - assertFalse(fileListStreamer.isInitialized()); - fileList.add("Entry4"); - fileList.add("Entry5"); - waitForStreamingInitialization(fileListStreamer); - fileList.close(); - waitForStreamingClosure(fileListStreamer); - } - @Test public void testConcurrentAdd() throws Exception { - Object tuple[] = setupAndGetTuple(100, false); - FileList fileList = (FileList) tuple[0]; - FileListStreamer fileListStreamer = (FileListStreamer) tuple[1]; + FileList fileList = setupFileList(false); int numOfEntries = 1000; int numOfThreads = 10; ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads); @@ -111,54 +69,17 @@ public void testConcurrentAdd() throws Exception { }); } executorService.awaitTermination(1, TimeUnit.MINUTES); - waitForStreamingInitialization(fileListStreamer); fileList.close(); - waitForStreamingClosure(fileListStreamer); ArgumentCaptor entryArgs = ArgumentCaptor.forClass(String.class); Mockito.verify(bufferedWriter, Mockito.times(numOfEntries)).write(entryArgs.capture()); } - private void waitForStreamingInitialization(FileListStreamer fileListStreamer) throws InterruptedException { - long sleepTime = 1000L; - int iter = 0; - while (!fileListStreamer.isInitialized()) { - Thread.sleep(sleepTime); - iter++; - if (iter == 5) { - throw new IllegalStateException("File Streamer not initialized till 5s."); - } - } - } - - private void waitForStreamingClosure(FileListStreamer fileListStreamer) throws InterruptedException { - long sleepTime = 1000L; - int iter = 0; - while (!isStreamingClosedProperly(fileListStreamer)) { - Thread.sleep(sleepTime); - iter++; - if (iter == 5) { - throw new IllegalStateException("File Streamer not getting closed till 5s."); - } - } - } - - private Object[] setupAndGetTuple(int cacheSize, boolean lazyDataCopy) throws Exception { + private FileList setupFileList(boolean lazyDataCopy) throws Exception { HiveConf hiveConf = Mockito.mock(HiveConf.class); Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET)).thenReturn(lazyDataCopy); Path backingFile = new Path("/tmp/backingFile"); - LinkedBlockingQueue cache = new LinkedBlockingQueue<>(cacheSize); - FileListStreamer fileListStreamer = Mockito.spy(new FileListStreamer(cache, backingFile, hiveConf)); - FileList fileList = new FileList(backingFile, fileListStreamer, cache, hiveConf); - Mockito.doReturn(bufferedWriter).when(fileListStreamer).lazyInitWriter(); - Object[] tuple = new Object[] {fileList, fileListStreamer}; - return tuple; - } - - private boolean isStreamingToFile(FileListStreamer fileListStreamer) { - return fileListStreamer.isInitialized() && fileListStreamer.isAlive(); - } - - private boolean isStreamingClosedProperly(FileListStreamer fileListStreamer) { - return fileListStreamer.isInitialized() && !fileListStreamer.isAlive() && fileListStreamer.isValid(); + FileList fileList = Mockito.spy(new FileList(backingFile, hiveConf)); + Mockito.doReturn(bufferedWriter).when(fileList).initWriter(); + return fileList; } }