diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c172643891..a5c4b76e0b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -542,10 +542,6 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Number of threads that will be used to dump partition data information during repl dump."), REPL_RUN_DATA_COPY_TASKS_ON_TARGET("hive.repl.run.data.copy.tasks.on.target", true, "Indicates whether replication should run data copy tasks during repl load operation."), - REPL_FILE_LIST_CACHE_SIZE("hive.repl.file.list.cache.size", 10000, - "This config indicates threshold for the maximum number of data copy locations to be kept in memory. \n" - + "When the config 'hive.repl.run.data.copy.tasks.on.target' is set to true, this config " + - "is not considered."), REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, "Indicates whether replication dump only metadata information or data + metadata. \n" + "This config makes hive.repl.include.external.tables config ineffective."), @@ -653,6 +649,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.HOURS), "Total allowed retry duration in hours inclusive of all retries. Once this is exhausted, " + "the policy instance will be marked as failed and will need manual intervention to restart."), + REPL_COPY_ITERATOR_RETRY("hive.repl.copy.iterator.retry", true, + "Determines whether writes happen with retry upon encountering filesystem errors for data-copy iterator files."), REPL_LOAD_PARTITIONS_BATCH_SIZE("hive.repl.load.partitions.batch.size", 10000, "Provide the maximum number of partitions of a table that will be batched together during \n" + "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n" diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java index efae01a812..4cf5a3245d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.DirCopyWork; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.junit.Assert; @@ -523,19 +524,21 @@ public static void insertForMerge(WarehouseInstance primary, String primaryDbNam return withClause; } - public static void assertExternalFileInfo(WarehouseInstance warehouseInstance, - List expected, Path externalTableInfoFile) throws IOException { + public static void assertExternalFileList(WarehouseInstance warehouseInstance, + List expected, Path externalTableFileList) throws IOException { DistributedFileSystem fileSystem = warehouseInstance.miniDFSCluster.getFileSystem(); - Assert.assertTrue(fileSystem.exists(externalTableInfoFile)); - InputStream inputStream = fileSystem.open(externalTableInfoFile); + Assert.assertTrue(fileSystem.exists(externalTableFileList)); + InputStream inputStream = fileSystem.open(externalTableFileList); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); Set tableNames = new HashSet<>(); for (String line = reader.readLine(); line != null; line = reader.readLine()) { - String[] components = line.split(","); - Assert.assertEquals("The file should have tableName,base64encoded(data_location)", - 2, components.length); - tableNames.add(components[0]); + String[] components = line.split(DirCopyWork.URI_SEPARATOR); + Assert.assertEquals("The file should have sourcelocation#targetlocation#tblName", + 3, components.length); + tableNames.add(components[2]); + Assert.assertTrue(components[0].length() > 0); Assert.assertTrue(components[1].length() > 0); + Assert.assertTrue(components[2].length() > 0); } Assert.assertTrue(tableNames.containsAll(expected)); reader.close(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 84699ce192..2279a8fe54 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; @@ -69,7 +70,7 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_HIVE_BASE_DIR; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.TARGET_OF_REPLICATION; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -1691,7 +1692,7 @@ public void testHdfsNSLazyCopyBootStrapExtTbls() throws Throwable { "load_time=2012-02-21 07%3A08%3A09.124"}) .dump(primaryDbName, clause); - assertExternalFileInfo(Arrays.asList("ext_table1", "ext_table2"), tuple.dumpLocation, false, primary); + assertExternalFileList(Arrays.asList("ext_table1", "ext_table2"), tuple.dumpLocation, primary); //SecurityException expected from DirCopyTask try{ replica.load(replicatedDbName, primaryDbName, clause); @@ -1773,7 +1774,7 @@ public void testHdfsNSLazyCopyIncrExtTbls() throws Throwable { "load_time=2012-02-21 07%3A08%3A09.124"}) .dump(primaryDbName, clause); - assertExternalFileInfo(Arrays.asList("ext_table1", "ext_table2"), tuple.dumpLocation, true, primary); + assertExternalFileList(Arrays.asList("ext_table1", "ext_table2"), tuple.dumpLocation, primary); //SecurityException expected from DirCopyTask try{ replica.load(replicatedDbName, primaryDbName, clause); @@ -1836,6 +1837,64 @@ public void testHdfsNameserviceWithDataCopy() throws Throwable { .verifyResults(new String[]{"2", "3"}); } + @Test + public void testReplWithRetryDisabledIterators() throws Throwable { + List clause = new ArrayList<>(); + //NS replacement parameters has no effect when data is also copied to staging + clause.add("'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='false'"); + clause.add("'" + HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY + "'='false'"); + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i String)") + .run("insert into table1 values (1)") + .run("insert into table1 values (2)") + .dump(primaryDbName, clause); + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); + replica.load(replicatedDbName, primaryDbName, clause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"acid_table", "table1"}) + .run("select * from table1") + .verifyResults(new String[] {"1", "2"}); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table1 values (3)") + .dump(primaryDbName, clause); + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); + replica.load(replicatedDbName, primaryDbName, clause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"acid_table", "table1"}) + .run("select * from table1") + .verifyResults(new String[]{"1", "2", "3"}); + + clause.add("'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); + tuple = primary.run("use " + primaryDbName) + .run("create external table ext_table1 (id int)") + .run("insert into ext_table1 values (3)") + .run("insert into ext_table1 values (4)") + .run("create external table ext_table2 (key int, value int) partitioned by (load_time timestamp)") + .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.123') values(1,2)") + .run("insert into ext_table2 partition(load_time = '2012-02-21 07:08:09.124') values(1,3)") + .run("show partitions ext_table2") + .verifyResults(new String[]{ + "load_time=2012-02-21 07%3A08%3A09.123", + "load_time=2012-02-21 07%3A08%3A09.124"}) + .dump(primaryDbName, clause); + assertExternalFileList(Arrays.asList("ext_table1", "ext_table2"), tuple.dumpLocation, primary); + replica.load(replicatedDbName, primaryDbName, clause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"acid_table", "table1", "ext_table1", "ext_table2"}) + .run("select * from ext_table1") + .verifyResults(new String[]{"3", "4"}) + .run("select value from ext_table2") + .verifyResults(new String[]{"2", "3"}); + } + @Test public void testCreateFunctionWithHdfsNameservice() throws Throwable { Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar"); @@ -2222,20 +2281,20 @@ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPa return withClause; } + private void assertFalseExternalFileList(Path externalTableFileList) + throws IOException { + DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); + Assert.assertFalse(fileSystem.exists(externalTableFileList)); + } + /* * Method used from TestReplicationScenariosExclusiveReplica */ - private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental, + private void assertExternalFileList(List expected, String dumplocation, WarehouseInstance warehouseInstance) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); - Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); - Path externalTableInfoFile; - if (isIncremental) { - externalTableInfoFile = new Path(hivePath, FILE_NAME); - } else { - externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); - } - ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); + Path externalTblFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); + ReplicationTestUtils.assertExternalFileList(warehouseInstance, expected, externalTblFileList); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index aee6234acb..7bf555a694 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -46,7 +46,6 @@ import java.util.Set; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; /** * Test replication scenarios with staging on replica. @@ -87,8 +86,8 @@ public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws Throwabl .run("insert into table t2 values (200)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, replica); + // verify that the external table list is written correctly for bootstrap + assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, replica); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -108,8 +107,8 @@ public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws Throwabl .run("insert into table t4 values (400)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, replica); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, replica); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -143,8 +142,8 @@ public void testTableLevelReplicationWithRemoteStaging() throws Throwable { .run("insert into table t2 values (200)") .dump(primaryDbName +".'t[0-9]+'", withClauseOptions); - // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, replica); + // verify that the external table list is written correctly for bootstrap + assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, replica); //verify table list verifyTableListForPolicy(replica.miniDFSCluster.getFileSystem(), @@ -171,8 +170,8 @@ public void testTableLevelReplicationWithRemoteStaging() throws Throwable { .run("insert into t5 partition(p=2) values(20)") .dump(primaryDbName + ".'t[0-9]+'", withClauseOptions); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, replica); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, replica); //verify table list verifyTableListForPolicy(replica.miniDFSCluster.getFileSystem(), @@ -214,8 +213,8 @@ public void testDistCpCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwable .run("insert into table t2 values (600)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, primary); + // verify that the external table list is written correctly for bootstrap + assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, primary); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -235,8 +234,8 @@ public void testDistCpCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwable .run("insert into table t4 values (800)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, primary); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, primary); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -271,8 +270,8 @@ public void testDistCpCopyWithRemoteStagingAndCopyTaskOnSource() throws Throwabl .run("insert into table t2 values (200)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, replica); + // verify that the external table list is written correctly for bootstrap + assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, replica); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -292,8 +291,8 @@ public void testDistCpCopyWithRemoteStagingAndCopyTaskOnSource() throws Throwabl .run("insert into table t4 values (400)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, replica); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, replica); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -328,8 +327,8 @@ public void testDistCpCopyWithLocalStagingAndCopyTaskOnSource() throws Throwable .run("insert into table t2 values (600)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, primary); + // verify that the external table list is written correctly for bootstrap + assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, primary); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -349,8 +348,8 @@ public void testDistCpCopyWithLocalStagingAndCopyTaskOnSource() throws Throwable .run("insert into table t4 values (800)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, primary); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, primary); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -384,8 +383,8 @@ public void testRegularCopyRemoteStagingAndCopyTaskOnSource() throws Throwable { .run("insert into table t2 values (600)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, replica); + // verify that the external table list is written correctly for bootstrap + assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, replica); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -405,8 +404,8 @@ public void testRegularCopyRemoteStagingAndCopyTaskOnSource() throws Throwable { .run("insert into table t4 values (800)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, replica); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, replica); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -439,8 +438,8 @@ public void testRegularCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwabl .run("insert into table t2 values (600)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, primary); + // verify that the external table list is written correctly for bootstrap + assertExternalFileList(Arrays.asList("t1"), tuple.dumpLocation, primary); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -460,8 +459,8 @@ public void testRegularCopyWithLocalStagingAndCopyTaskOnTarget() throws Throwabl .run("insert into table t4 values (800)") .dump(primaryDbName, withClauseOptions); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, primary); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t1", "t3"), tuple.dumpLocation, primary); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -760,18 +759,12 @@ private void verifyTableDataExists(WarehouseInstance warehouse, Path dbDataPath, return confList; } - private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental, + private void assertExternalFileList(List expected, String dumplocation, WarehouseInstance warehouseInstance) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); - Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); - Path externalTableInfoFile; - if (isIncremental) { - externalTableInfoFile = new Path(hivePath, FILE_NAME); - } else { - externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); - } - ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); + Path externalTableFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); + ReplicationTestUtils.assertExternalFileList(warehouseInstance, expected, externalTableFileList); } /* diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index a33e95c828..4964822559 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -76,7 +76,6 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -126,11 +125,10 @@ public void replicationWithoutExternalTables() throws Throwable { .run("insert into table t2 partition(country='france') values ('paris')") .dump(primaryDbName, withClause); - // the _external_tables_file info only should be created if external tables are to be replicated not otherwise - Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), - EximUtil.METADATA_PATH_NAME); + // the _file_list_external only should be created if external tables are to be replicated not otherwise + Path replHiveBasePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(metadataPath + relativeExtInfoPath(primaryDbName)))); + .exists(new Path(replHiveBasePath, EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName, withClause) .run("repl status " + replicatedDbName) @@ -148,11 +146,10 @@ public void replicationWithoutExternalTables() throws Throwable { .run("insert into table t3 values (20)") .dump(primaryDbName, withClause); - // the _external_tables_file info only should be created if external tables are to be replicated not otherwise - metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), - EximUtil.METADATA_PATH_NAME); + // _file_list_external only should be created if external tables are to be replicated not otherwise + replHiveBasePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(metadataPath + relativeExtInfoPath(null)))); + .exists(new Path(replHiveBasePath, EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) @@ -175,8 +172,8 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("insert into table t2 partition(country='france') values ('paris')") .dump(primaryDbName); - // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false); + // verify that the external table filelist is written correctly for bootstrap + assertExternalFileList(Arrays.asList("t1", "t2"), tuple.dumpLocation); @@ -207,8 +204,8 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("create external table t4 as select id from t3") .dump(primaryDbName); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) @@ -225,8 +222,8 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("drop table t1") .dump(primaryDbName); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation); } @Test @@ -244,8 +241,8 @@ public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable .run("insert into table t2 partition(country='france') values ('paris')") .dump(primaryDbName, lazyCopyClause); - // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false); + // verify that the external table list is written correctly for bootstrap + assertExternalFileList(Arrays.asList("t1", "t2"), tuple.dumpLocation); @@ -276,8 +273,8 @@ public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable .run("create external table t4 as select id from t3") .dump(primaryDbName, lazyCopyClause); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName, lazyCopyClause) .run("use " + replicatedDbName) @@ -513,7 +510,7 @@ public void externalTableWithPartitions() throws Throwable { .run("insert into t2 partition(country='india') values ('bangalore')") .dump(primaryDbName, withClause); - assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName, false); + assertExternalFileList(Collections.singletonList("t2"), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) @@ -536,7 +533,7 @@ public void externalTableWithPartitions() throws Throwable { .run("insert into t2 partition(country='australia') values ('sydney')") .dump(primaryDbName, withClause); - assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, true); + assertExternalFileList(Collections.singletonList("t2"), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) @@ -624,7 +621,7 @@ public void externalTableWithPartitionsInBatch() throws Throwable { .run("insert into t2 partition(country='australia') values ('sydney')") .dump(primaryDbName, withClause); - assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName, false); + assertExternalFileList(Collections.singletonList("t2"), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) @@ -652,7 +649,7 @@ public void externalTableIncrementalCheckpointing() throws Throwable { .run("insert into table t2 values (4)") .dump(primaryDbName, withClause); - assertExternalFileInfo(Arrays.asList(new String[]{"t1", "t2"}), tuple.dumpLocation, primaryDbName, false); + assertExternalFileList(Arrays.asList(new String[]{"t1", "t2"}), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -675,8 +672,8 @@ public void externalTableIncrementalCheckpointing() throws Throwable { .run("insert into table t3 values (8)") .dump(primaryDbName, withClause); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t2", "t3"), incrementalDump1.dumpLocation, true); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t2", "t3"), incrementalDump1.dumpLocation); FileSystem fs = primary.miniDFSCluster.getFileSystem(); Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); @@ -710,7 +707,7 @@ public void externalTableIncrementalCheckpointing() throws Throwable { WarehouseInstance.Tuple incrementalDump2 = primary.dump(primaryDbName, withClause); assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation); assertTrue(fs.getFileStatus(metaDir).getModificationTime() > oldMetadirModTime); - assertExternalFileInfo(Arrays.asList("t2", "t3"), incrementalDump2.dumpLocation, true); + assertExternalFileList(Arrays.asList("t2", "t3"), incrementalDump2.dumpLocation); //first event meta is not rewritten for (Map.Entry entry: firstEventModTimeMap.entrySet()) { assertEquals((long)entry.getValue(), fs.getFileStatus(entry.getKey()).getModificationTime()); @@ -745,7 +742,7 @@ public void externalTableIncrementalReplication() throws Throwable { .run("alter table t1 add partition(country='us')") .dump(primaryDbName); - assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true); + assertExternalFileList(Collections.singletonList("t1"), tuple.dumpLocation); // Add new data externally, to a partition, but under the partition level top directory // Also, it is added after dumping the events so data should not be seen at target after REPL LOAD. @@ -789,9 +786,9 @@ public void externalTableIncrementalReplication() throws Throwable { outputStream.write("chennai\n".getBytes()); } - // Repl load with zero events but external tables location info should present. + // Repl load with zero events but external tables file list should present. tuple = primary.dump(primaryDbName); - assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true); + assertExternalFileList(Collections.singletonList("t1"), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) @@ -841,11 +838,10 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .run("insert into table t2 partition(country='france') values ('paris')") .dump(primaryDbName, dumpWithClause); - // the _external_tables_file info only should be created if external tables are to be replicated not otherwise - Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), - EximUtil.METADATA_PATH_NAME); + // the _file_list_external only should be created if external tables are to be replicated not otherwise + Path replHiveBasePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(metadataPath + relativeExtInfoPath(null)))); + .exists(new Path(replHiveBasePath, EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -867,13 +863,13 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .run("create table t4 as select * from t3") .dump(primaryDbName, dumpWithClause); - // the _external_tables_file info should be created as external tables are to be replicated. + // the _file_list_external should be created as external tables are to be replicated. Path hivePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(hivePath + relativeExtInfoPath(null)))); + .exists(new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL))); - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t2", "t3"), tuple.dumpLocation, true); + // verify that the external table list is written correctly for incremental + assertExternalFileList(Arrays.asList("t2", "t3"), tuple.dumpLocation); // _bootstrap directory should be created as bootstrap enabled on external tables. String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; @@ -1090,7 +1086,7 @@ public Table apply(@Nullable Table table) { } // Only table t2 should exist in the data location list file. - assertExternalFileInfo(Collections.singletonList("t2"), tupleInc.dumpLocation, true); + assertExternalFileList(Collections.singletonList("t2"), tupleInc.dumpLocation); // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have // inserted data. @@ -1464,8 +1460,8 @@ public void testDatabaseLevelCopy(boolean runCopyTasksOnTarget) // Check the task copied post bootstrap, It should have the database loc, // the table 'a' since that is outside of the default location, and the // 'c', since its partition is out of the default location. - assertExternalFileInfo(Arrays.asList(primaryDbName.toLowerCase(), "a", "c"), - tuple.dumpLocation, primaryDbName, false); + assertExternalFileList(Arrays.asList(primaryDbName.toLowerCase(), "a", "c"), + tuple.dumpLocation); // Add more data to tables and do a incremental run and create another // tables one inside and other outside default location. @@ -1520,9 +1516,9 @@ public void testDatabaseLevelCopy(boolean runCopyTasksOnTarget) // New table in the warehouse shouldn't be there but the table created // outside should be there, apart from the ones in the previous run. - assertExternalFileInfo( + assertExternalFileList( Arrays.asList(primaryDbName.toLowerCase(), "a", "c", "newout"), - tuple.dumpLocation, true); + tuple.dumpLocation); } @Test @@ -1570,8 +1566,7 @@ public void testDatabaseLevelCopyDisabled() throws Throwable { .run("select place from c where country='france'") .verifyResult("paris"); - assertExternalFileInfo(Arrays.asList("a", "b", "c"), tuple.dumpLocation, - primaryDbName, false); + assertExternalFileList(Arrays.asList("a", "b", "c"), tuple.dumpLocation); // Add more data to tables and do a incremental run and create another // tables one inside and other outside default location. @@ -1610,34 +1605,15 @@ public void testDatabaseLevelCopyDisabled() throws Throwable { .run("select id from newout") .verifyResult("2"); - assertExternalFileInfo(Arrays + assertExternalFileList(Arrays .asList("a", "b", "c", "newin", "newout"), - tuple.dumpLocation, true); + tuple.dumpLocation); } - private void assertExternalFileInfo(List expected, String dumplocation, - boolean isIncremental) throws IOException { - assertExternalFileInfo(expected, dumplocation, null, isIncremental); - } - private void assertExternalFileInfo(List expected, String dumplocation, String dbName, - boolean isIncremental) + private void assertExternalFileList(List expected, String dumplocation) throws IOException { Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); - Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); - Path externalTableInfoFile; - if (isIncremental) { - externalTableInfoFile = new Path(hivePath + relativeExtInfoPath(dbName)); - } else { - externalTableInfoFile = new Path(metadataPath + relativeExtInfoPath(dbName)); - } - ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile); - } - - private String relativeExtInfoPath(String dbName) { - if (dbName == null) { - return File.separator + FILE_NAME; - } else { - return File.separator + dbName.toLowerCase() + File.separator + FILE_NAME; - } + Path externalTableFileList = new Path(hivePath, EximUtil.FILE_LIST_EXTERNAL); + ReplicationTestUtils.assertExternalFileList(primary, expected, externalTableFileList); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java index 357302c809..5c93f3d3cb 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java @@ -48,7 +48,6 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_HIVE_BASE_DIR; import static org.junit.Assert.assertFalse; @@ -103,9 +102,9 @@ public void replicationWithoutExternalTables() throws Throwable { .run("insert into table t2 partition(country='france') values ('paris')") .dump(primaryDbName, dumpWithClause); - // the _external_tables_file info only should be created if external tables are to be replicated not otherwise - assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + // the _file_list_external only should be created if external tables are to be replicated not otherwise + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("repl status " + replicatedDbName) @@ -123,9 +122,10 @@ public void replicationWithoutExternalTables() throws Throwable { .run("insert into table t3 values (20)") .dump(primaryDbName, dumpWithClause); - // the _external_tables_file data only should be created if external tables are to be replicated not otherwise + // the _file_list_external should be created if external tables are to be replicated not otherwise assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, FILE_NAME))); + .exists(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -148,8 +148,9 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("insert into table t2 partition(country='france') values ('paris')") .dumpWithCommand("repl dump " + primaryDbName); - // verify that the external table info is not written as metadata only replication - assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + // verify that the external table list is not written as metadata only replication + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); List withClauseOptions = ReplicationTestUtils.includeExternalTableClause(true); @@ -177,8 +178,9 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("create external table t4 as select id from t3") .dumpWithCommand("repl dump " + primaryDbName); - // verify that the external table info is written correctly for incremental - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + // verify that the external table list is written correctly for incremental + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -193,8 +195,9 @@ public void externalTableReplicationWithDefaultPaths() throws Throwable { .run("drop table t1") .dumpWithCommand("repl dump " + primaryDbName); - // verify that the external table info is written correctly for incremental - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + // verify that the external table list is written correctly for incremental + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); } @Test @@ -267,7 +270,9 @@ public void externalTableWithPartitions() throws Throwable { .run("insert into t2 partition(country='india') values ('bangalore')") .dumpWithCommand("repl dump " + primaryDbName); - assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); + replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -290,7 +295,8 @@ public void externalTableWithPartitions() throws Throwable { .run("insert into t2 partition(country='australia') values ('sydney')") .dump(primaryDbName); - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -372,7 +378,8 @@ public void externalTableIncrementalReplication() throws Throwable { .run("alter table t1 add partition(country='us')") .dump(primaryDbName); - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); // Add new data externally, to a partition, but under the partition level top directory // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. @@ -406,7 +413,8 @@ public void externalTableIncrementalReplication() throws Throwable { // Repl load with zero events but external tables location info should present. tuple = primary.dump(primaryDbName); - assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME)); + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -455,9 +463,9 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .run("insert into table t2 partition(country='france') values ('paris')") .dump(primaryDbName, dumpWithClause); - // the _external_tables_file info only should be created if external tables are to be replicated not otherwise - assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + // the file _file_list_external only should be created if external tables are to be replicated not otherwise + assertFalseExternalFileList(new Path(new Path(tuple.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, primaryDbName, loadWithClause) .status(replicatedDbName) @@ -483,14 +491,13 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { .dump(primaryDbName, dumpWithClause); String hiveDumpDir = tuple.dumpLocation + File.separator + REPL_HIVE_BASE_DIR; - // the _external_tables_file info should be created as external tables are to be replicated. + // the _file_list_external should be created as external tables are to be replicated. assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(hiveDumpDir, FILE_NAME))); - - // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t2", "t3"), - new Path(hiveDumpDir, FILE_NAME)); + .exists(new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL))); + // verify that the external table list is written correctly for incremental + ReplicationTestUtils.assertExternalFileList(primary, Arrays.asList("t2", "t3"), + new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL)); // _bootstrap directory should be created as bootstrap enabled on external tables. Path dumpPath = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME); @@ -583,8 +590,8 @@ public Table apply(@Nullable Table table) { } // Only table t2 should exist in the data location list file. - String hiveDumpDir = tupleInc.dumpLocation + File.separator + REPL_HIVE_BASE_DIR; - assertFalseExternalFileInfo(new Path(hiveDumpDir, FILE_NAME)); + assertFalseExternalFileList(new Path(new Path(tupleInc.dumpLocation, + REPL_HIVE_BASE_DIR), EximUtil.FILE_LIST_EXTERNAL)); // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have // inserted data. @@ -632,14 +639,9 @@ public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { .verifyResult(inc2Tuple.lastReplicationId); } - private void assertFalseExternalFileInfo(Path externalTableInfoFile) + private void assertFalseExternalFileList(Path externalTableFileList) throws IOException { DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); - Assert.assertFalse(fileSystem.exists(externalTableInfoFile)); - } - - private void assertExternalFileInfo(List expected, Path externalTableInfoFile) - throws IOException { - ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile); + Assert.assertFalse(fileSystem.exists(externalTableFileList)); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index c4f2b5fe68..57e599c87d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -48,7 +48,6 @@ import java.io.InputStreamReader; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -509,14 +508,13 @@ public void testBootstrapExternalTablesWithIncludeAndExcludeList() throws Throwa .dump(replPolicy, dumpWithClause); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; - Path metaDataPath = new Path(hiveDumpDir, EximUtil.METADATA_PATH_NAME); - // the _external_tables_file info should be created as external tables are to be replicated. + // the _file_list_external should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME))); + .exists(new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL))); - // Verify that the external table info contains only table "a2". - ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"), - new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME)); + // Verify that _file_list_external contains only table "a2". + ReplicationTestUtils.assertExternalFileList(primary, Arrays.asList("a2"), + new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, replPolicy, loadWithClause) .run("use " + replicatedDbName) @@ -550,13 +548,13 @@ public void testBootstrapExternalTablesIncrementalPhaseWithIncludeAndExcludeList loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; - // the _external_tables_file info should be created as external tables are to be replicated. + // the _file_list_external should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(hiveDumpDir, FILE_NAME))); + .exists(new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL))); - // Verify that the external table info contains only table "a2". - ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"), - new Path(hiveDumpDir, FILE_NAME)); + // Verify that _file_list_external contains only table "a2". + ReplicationTestUtils.assertExternalFileList(primary, Arrays.asList("a2"), + new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL)); replica.load(replicatedDbName, replPolicy, loadWithClause) .run("use " + replicatedDbName) @@ -699,13 +697,13 @@ public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() throws loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; - // the _external_tables_file info should be created as external tables are to be replicated. + // _file_list_external should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(hiveDumpDir, FILE_NAME))); + .exists(new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL))); - // Verify that the external table info contains table "a2" and "c2". - ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2", "c2"), - new Path(hiveDumpDir, FILE_NAME)); + // Verify that _file_list_external contains table "a2" and "c2". + ReplicationTestUtils.assertExternalFileList(primary, Arrays.asList("a2", "c2"), + new Path(hiveDumpDir, EximUtil.FILE_LIST_EXTERNAL)); // Verify if the expected tables are bootstrapped. verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java index c232dfa0b0..343cbe4faa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java @@ -30,8 +30,9 @@ Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class DirCopyWork implements Serializable, StringConvertibleObject { - private static final String URI_SEPARATOR = "#"; + public static final String URI_SEPARATOR = "#"; private static final long serialVersionUID = 1L; + private String tableName; private Path fullyQualifiedSourcePath; private Path fullyQualifiedTargetPath; private String dumpDirectory; @@ -42,7 +43,8 @@ public DirCopyWork(ReplicationMetricCollector metricCollector, String dumpDirect this.dumpDirectory = dumpDirectory; } - public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) { + public DirCopyWork(String tableName, Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) { + this.tableName = tableName; this.fullyQualifiedSourcePath = fullyQualifiedSourcePath; this.fullyQualifiedTargetPath = fullyQualifiedTargetPath; } @@ -75,7 +77,9 @@ public String convertToString() { StringBuilder objInStr = new StringBuilder(); objInStr.append(fullyQualifiedSourcePath) .append(URI_SEPARATOR) - .append(fullyQualifiedTargetPath); + .append(fullyQualifiedTargetPath) + .append(URI_SEPARATOR) + .append(tableName); return objInStr.toString(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 0851811a8b..0154fb4577 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.exec.util.Retryable; @@ -116,7 +117,6 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK; import static org.apache.hadoop.hive.metastore.ReplChangeManager.getReplPolicyIdString; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER; @@ -265,7 +265,7 @@ private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws IOExc } } - private void initiateDataCopyTasks() throws SemanticException { + private void initiateDataCopyTasks() throws SemanticException, IOException { TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); if (childTasks == null) { childTasks = new ArrayList<>(); @@ -633,9 +633,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive dmd.setReplScope(work.replScope); dmd.write(true); } - int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); - try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); - FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { + try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST); + FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL)) { // Examine all the tables if required. if (shouldExamineTablesToDump() || (tableList != null)) { // If required wait more for any transactions open at the time of starting the ACID bootstrap. @@ -657,39 +656,38 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive Path dbRootMetadata = new Path(metadataPath, dbName); Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName); boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); - try (Writer writer = new Writer(dumpRoot, conf)) { - Path dbPath = null; - boolean isSingleCopyTaskForExternalTables = - conf.getBoolVar(REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK) - && work.replScope.includeAllTables(); - boolean isExternalTablePresent = false; - for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { - try { - Table table = hiveDb.getTable(dbName, tableName); - - // Dump external table locations if required. - if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) - && shouldDumpExternalTableLocation()) { - dbPath = new Path(hiveDb.getDatabase(dbName).getLocationUri()); - writer.dataLocationDump(table, extTableFileList, dbPath, - !isSingleCopyTaskForExternalTables, conf); - isExternalTablePresent=true; - } - - // Dump the table to be bootstrapped if required. - if (shouldBootstrapDumpTable(table)) { - HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, - hiveDb, tableTuple, managedTblList, dataCopyAtLoad); - } - if (tableList != null && isTableSatifiesConfig(table)) { - tableList.add(tableName); - } - } catch (InvalidTableException te) { - // Repl dump shouldn't fail if the table is dropped/renamed while dumping it. - // Just log a debug message and skip it. - LOG.debug(te.getMessage()); + ReplExternalTables externalTablesWriter = new ReplExternalTables(conf); + Path dbPath = null; + boolean isSingleCopyTaskForExternalTables = + conf.getBoolVar(REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK) + && work.replScope.includeAllTables(); + boolean isExternalTablePresent = false; + for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { + try { + Table table = hiveDb.getTable(dbName, tableName); + + // Dump external table locations if required. + if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) + && shouldDumpExternalTableLocation()) { + dbPath = new Path(hiveDb.getDatabase(dbName).getLocationUri()); + externalTablesWriter.dataLocationDump(table, extTableFileList, dbPath, + !isSingleCopyTaskForExternalTables, conf); + isExternalTablePresent=true; + } + + // Dump the table to be bootstrapped if required. + if (shouldBootstrapDumpTable(table)) { + HiveWrapper.Tuple
tableTuple = new HiveWrapper(hiveDb, dbName).table(table); + dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId, + hiveDb, tableTuple, managedTblList, dataCopyAtLoad); + } + if (tableList != null && isTableSatifiesConfig(table)) { + tableList.add(tableName); } + } catch (InvalidTableException te) { + // Repl dump shouldn't fail if the table is dropped/renamed while dumping it. + // Just log a debug message and skip it. + LOG.debug(te.getMessage()); } // if it is not a table level replication, add a single task for // the database default location for external tables... @@ -697,7 +695,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive && isSingleCopyTaskForExternalTables) { // Using the lower case of the database name, to keep it // consistent with the name used during bootstrap. - writer.dbLocationDump(dbName.toLowerCase(), dbPath, extTableFileList, conf); + externalTablesWriter.dbLocationDump(dbName.toLowerCase(), dbPath, extTableFileList, conf); } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); @@ -888,9 +886,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) metadataPath.getFileSystem(conf).delete(metadataPath, true); } List functionsBinaryCopyPaths = Collections.emptyList(); - int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); - try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); - FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) { + try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST); + FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL)) { for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("Dumping db: " + dbName); // TODO : Currently we don't support separate table list for each database. @@ -940,7 +937,8 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; - try (Writer writer = new Writer(dbRoot, conf)) { + try { + ReplExternalTables externalTablesWriter = new ReplExternalTables(conf); boolean isSingleTaskForExternalDb = conf.getBoolVar(REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK) && work.replScope.includeAllTables(); @@ -962,7 +960,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) if (shouldDumpExternalTableLocation() && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { LOG.debug("Adding table {} to external tables list", tblName); - writer.dataLocationDump(tableTuple.object, extTableFileList, + externalTablesWriter.dataLocationDump(tableTuple.object, extTableFileList, new Path(db.getLocationUri()), !isSingleTaskForExternalDb, conf); isExternalTablePresent = true; } @@ -984,7 +982,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) // the database default location for external tables. if (isExternalTablePresent && shouldDumpExternalTableLocation() && isSingleTaskForExternalDb) { - writer.dbLocationDump(dbName, new Path(db.getLocationUri()), extTableFileList, conf); + externalTablesWriter.dbLocationDump(dbName, new Path(db.getLocationUri()), extTableFileList, conf); } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); } catch (Exception e) { @@ -1036,9 +1034,9 @@ private void updateReplSourceFor(Hive hiveDb, String dbName, Database db, hiveDb.alterDatabase(dbName, db); } - private FileList createTableFileList(Path dumpRoot, String fileName, int cacheSize) { + private FileList createTableFileList(Path dumpRoot, String fileName) { Path backingFile = new Path(dumpRoot, fileName); - return new FileList(backingFile, cacheSize, conf); + return new FileList(backingFile, conf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 47051e41ee..4530c7e465 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -21,23 +21,29 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; @Explain(displayName = "Replication Dump Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, @@ -184,42 +190,86 @@ public void setResultValues(List resultValues) { this.resultValues = resultValues; } - public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) { + public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) throws IOException { if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) { return Collections.emptyList(); } List> tasks = new ArrayList<>(); - while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { - DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString()); - dirCopyWork.loadFromString(externalTblCopyPathIterator.next()); - Task task = TaskFactory.get(dirCopyWork, conf); - tasks.add(task); - tracker.addTask(task); - LOG.debug("added task for {}", dirCopyWork); + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(UncheckedIOException.class).build(); + try { + retryable.executeCallable((Callable) ()-> { + try{ + int numEntriesToSkip = tasks == null ? 0 : tasks.size(); + while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { + if(numEntriesToSkip > 0) { + //skip tasks added in previous attempts of this retryable block + externalTblCopyPathIterator.next(); + numEntriesToSkip--; + continue; + } + DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, currentDumpPath.toString()); + dirCopyWork.loadFromString(externalTblCopyPathIterator.next()); + Task task = TaskFactory.get(dirCopyWork, conf); + tasks.add(task); + tracker.addTask(task); + LOG.debug("added task for {}", dirCopyWork); + } + } catch (UncheckedIOException e) { + LOG.error("Reading entry for data copy failed for external tables, attempting retry.", e); + throw e; + } + return null; + }); + } catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); } return tasks; } - public List> managedTableCopyTasks(TaskTracker tracker, HiveConf conf) { + public List> managedTableCopyTasks(TaskTracker tracker, HiveConf conf) throws IOException { if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) { return Collections.emptyList(); } List> tasks = new ArrayList<>(); - while (managedTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { - ReplicationSpec replSpec = new ReplicationSpec(); - replSpec.setIsReplace(true); - replSpec.setInReplicationScope(true); - EximUtil.DataCopyPath managedTableCopyPath = new EximUtil.DataCopyPath(replSpec); - managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next()); - //If its incremental, in checkpointing case, dump dir may exist. We will delete the event dir. - //In case of bootstrap checkpointing we will not delete the entire dir and just do a sync - Task copyTask = ReplCopyTask.getDumpCopyTask( - managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), - managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite, !isBootstrap, - getCurrentDumpPath().toString(), getMetricCollector()); - tasks.add(copyTask); - tracker.addTask(copyTask); - LOG.debug("added task for {}", managedTableCopyPath); + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(UncheckedIOException.class).build(); + try { + retryable.executeCallable((Callable) ()-> { + try{ + int numEntriesToSkip = tasks == null ? 0 : tasks.size(); + while (managedTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { + if(numEntriesToSkip > 0) { + //skip tasks added in previous attempts of this retryable block + managedTblCopyPathIterator.next(); + numEntriesToSkip--; + continue; + } + ReplicationSpec replSpec = new ReplicationSpec(); + replSpec.setIsReplace(true); + replSpec.setInReplicationScope(true); + EximUtil.DataCopyPath managedTableCopyPath = new EximUtil.DataCopyPath(replSpec); + managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next()); + //If its incremental, in checkpointing case, dump dir may exist. We will delete the event dir. + //In case of bootstrap checkpointing we will not delete the entire dir and just do a sync + Task copyTask = ReplCopyTask.getDumpCopyTask( + managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), + managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite, !isBootstrap, + getCurrentDumpPath().toString(), getMetricCollector()); + tasks.add(copyTask); + tracker.addTask(copyTask); + LOG.debug("added task for {}", managedTableCopyPath); + } + } catch (UncheckedIOException e) { + LOG.error("Reading entry for data copy failed for managed tables, attempting retry.", e); + throw e; + } + return null; + }); + } catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); } return tasks; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index 2f11ab2044..11d596e17f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.exec.repl; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; @@ -28,7 +27,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.util.FileList; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -39,35 +37,112 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.Closeable; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.StringWriter; import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.Base64; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -/** - * Format of the file used to dump information about external tables: - *

- * table_name1,[base64Encoded(table_dir_location)]\n - * - * The file generated here is explicitly used for data copy of external tables and hence handling of - * writing this file is different than regular event handling for replication based on the conditions - * specified in {@link org.apache.hadoop.hive.ql.parse.repl.dump.Utils#shouldReplicate} - */ -public final class ReplExternalTables { +public class ReplExternalTables { private static final Logger LOG = LoggerFactory.getLogger(ReplExternalTables.class); - private static final String FIELD_SEPARATOR = ","; - public static final String FILE_NAME = "_external_tables_info"; + private HiveConf hiveConf; + private boolean includeExternalTables; + private boolean dumpMetadataOnly; + + public ReplExternalTables(HiveConf conf) { + this.hiveConf = conf; + this.includeExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); + this.dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || + hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE); + } - private ReplExternalTables(){} + private boolean shouldWrite() { + return !dumpMetadataOnly && includeExternalTables; + } + + /** + * this will dump a single line per external table. it can include additional lines for the same + * table if the table is partitioned and the partition location is outside the table. + * It returns list of all the external table locations. + */ + void dataLocationDump(Table table, FileList fileList, + Path dbLoc, boolean isTableLevelReplication, HiveConf conf) + throws InterruptedException, IOException, HiveException { + if (!shouldWrite()) { + return; + } + if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + throw new IllegalArgumentException( + "only External tables can be writen via this writer, provided table is " + table + .getTableType()); + } + Path fullyQualifiedDataLocation = PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf)); + if (isTableLevelReplication || !FileUtils + .isPathWithinSubtree(table.getDataLocation(), dbLoc)) { + dirLocationToCopy(table.getTableName(), fileList, fullyQualifiedDataLocation, conf); + } + if (table.isPartitioned()) { + List partitions; + try { + partitions = Hive.get(hiveConf).getPartitions(table); + } catch (HiveException e) { + if (e.getCause() instanceof NoSuchObjectException) { + // If table is dropped when dump in progress, just skip partitions data location dump + LOG.debug(e.getMessage()); + return; + } + throw e; + } + + for (Partition partition : partitions) { + boolean partitionLocOutsideTableLoc = !FileUtils.isPathWithinSubtree( + partition.getDataLocation(), table.getDataLocation() + ); + if (partitionLocOutsideTableLoc) { + // check if the entire db location is getting copied, then the + // partition isn't inside the db location, if so we can skip + // copying this separately. + if (!isTableLevelReplication && FileUtils + .isPathWithinSubtree(partition.getDataLocation(), dbLoc)) { + partitionLocOutsideTableLoc = false; + } + } + if (partitionLocOutsideTableLoc) { + fullyQualifiedDataLocation = PathBuilder + .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf)); + dirLocationToCopy(table.getTableName(), fileList, fullyQualifiedDataLocation, conf); + } + } + } + } + + void dbLocationDump(String dbName, Path dbLocation, FileList fileList, + HiveConf conf) throws Exception { + Path fullyQualifiedDataLocation = PathBuilder + .fullyQualifiedHDFSUri(dbLocation, FileSystem.get(hiveConf)); + dirLocationToCopy(dbName, fileList, fullyQualifiedDataLocation, conf); + } + + private void dirLocationToCopy(String tableName, FileList fileList, Path sourcePath, HiveConf conf) + throws HiveException, IOException { + Path basePath = getExternalTableBaseDir(conf); + Path targetPath = externalTableDataPath(conf, basePath, sourcePath); + //Here, when src and target are HA clusters with same NS, then sourcePath would have the correct host + //whereas the targetPath would have an host that refers to the target cluster. This is fine for + //data-copy running during dump as the correct logical locations would be used. But if data-copy runs during + //load, then the remote location needs to point to the src cluster from where the data would be copied and + //the common original NS would suffice for targetPath. + if (hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE) && + hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET)) { + String remoteNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname); + if (StringUtils.isEmpty(remoteNS)) { + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE + .format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid " + + remoteNS == null ? "null" : remoteNS, ReplUtils.REPL_HIVE_SERVICE)); + } + targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost())); + sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS)); + } + fileList.add(new DirCopyWork(tableName, sourcePath, targetPath).convertToString()); + } public static String externalTableLocation(HiveConf hiveConf, String location) throws SemanticException { Path basePath = getExternalTableBaseDir(hiveConf); @@ -110,255 +185,4 @@ public static Path externalTableDataPath(HiveConf hiveConf, Path basePath, Path } return dataPath; } - - public static class Writer implements Closeable { - private static Logger LOG = LoggerFactory.getLogger(Writer.class); - private final HiveConf hiveConf; - private final Path writePath; - private final boolean includeExternalTables; - private final boolean dumpMetadataOnly; - private OutputStream writer; - - Writer(Path dbRoot, HiveConf hiveConf) throws IOException { - this.hiveConf = hiveConf; - writePath = new Path(dbRoot, FILE_NAME); - includeExternalTables = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES); - dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || - hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE); - if (shouldWrite()) { - this.writer = writePath.getFileSystem(hiveConf).create(writePath); - } - } - - private boolean shouldWrite() { - return !dumpMetadataOnly && includeExternalTables; - } - - /** - * this will dump a single line per external table. it can include additional lines for the same - * table if the table is partitioned and the partition location is outside the table. - * It returns list of all the external table locations. - */ - void dataLocationDump(Table table, FileList fileList, - Path dbLoc, boolean isTableLevelReplication, HiveConf conf) - throws InterruptedException, IOException, HiveException { - if (!shouldWrite()) { - return; - } - if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - throw new IllegalArgumentException( - "only External tables can be writen via this writer, provided table is " + table - .getTableType()); - } - Path fullyQualifiedDataLocation = PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf)); - if (isTableLevelReplication || !FileUtils - .isPathWithinSubtree(table.getDataLocation(), dbLoc)) { - write(lineFor(table.getTableName(), fullyQualifiedDataLocation, - hiveConf)); - dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf); - } - if (table.isPartitioned()) { - List partitions; - try { - partitions = Hive.get(hiveConf).getPartitions(table); - } catch (HiveException e) { - if (e.getCause() instanceof NoSuchObjectException) { - // If table is dropped when dump in progress, just skip partitions data location dump - LOG.debug(e.getMessage()); - return; - } - throw e; - } - - for (Partition partition : partitions) { - boolean partitionLocOutsideTableLoc = !FileUtils.isPathWithinSubtree( - partition.getDataLocation(), table.getDataLocation() - ); - if (partitionLocOutsideTableLoc) { - // check if the entire db location is getting copied, then the - // partition isn't inside the db location, if so we can skip - // copying this separately. - if (!isTableLevelReplication && FileUtils - .isPathWithinSubtree(partition.getDataLocation(), dbLoc)) { - partitionLocOutsideTableLoc = false; - } - } - if (partitionLocOutsideTableLoc) { - fullyQualifiedDataLocation = PathBuilder - .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf)); - write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); - dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf); - } - } - } - } - - void dbLocationDump(String dbName, Path dbLocation, FileList fileList, - HiveConf conf) throws Exception { - Path fullyQualifiedDataLocation = PathBuilder - .fullyQualifiedHDFSUri(dbLocation, FileSystem.get(hiveConf)); - write(lineFor(dbName, fullyQualifiedDataLocation, hiveConf)); - dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf); - } - - private void dirLocationToCopy(FileList fileList, Path sourcePath, HiveConf conf) - throws HiveException { - Path basePath = getExternalTableBaseDir(conf); - Path targetPath = externalTableDataPath(conf, basePath, sourcePath); - //Here, when src and target are HA clusters with same NS, then sourcePath would have the correct host - //whereas the targetPath would have an host that refers to the target cluster. This is fine for - //data-copy running during dump as the correct logical locations would be used. But if data-copy runs during - //load, then the remote location needs to point to the src cluster from where the data would be copied and - //the common original NS would suffice for targetPath. - if(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE) && - hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET)) { - String remoteNS = hiveConf.get(HiveConf.ConfVars.REPL_HA_DATAPATH_REPLACE_REMOTE_NAMESERVICE_NAME.varname); - if (StringUtils.isEmpty(remoteNS)) { - throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE - .format("Configuration 'hive.repl.ha.datapath.replace.remote.nameservice.name' is not valid " - + remoteNS == null ? "null" : remoteNS, ReplUtils.REPL_HIVE_SERVICE)); - } - targetPath = new Path(Utils.replaceHost(targetPath.toString(), sourcePath.toUri().getHost())); - sourcePath = new Path(Utils.replaceHost(sourcePath.toString(), remoteNS)); - } - fileList.add(new DirCopyWork(sourcePath, targetPath).convertToString()); - } - - private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf) - throws IOException, SemanticException { - StringWriter lineToWrite = new StringWriter(); - lineToWrite.append(tableName).append(FIELD_SEPARATOR); - Path dataLocation = - PathBuilder.fullyQualifiedHDFSUri(dataLoc, dataLoc.getFileSystem(hiveConf)); - byte[] encodedBytes = Base64.getEncoder() - .encode(dataLocation.toString().getBytes(StandardCharsets.UTF_8)); - String encodedPath = new String(encodedBytes, StandardCharsets.UTF_8); - lineToWrite.append(encodedPath).append("\n"); - return lineToWrite.toString(); - } - - private void write(String line) throws SemanticException { - Retryable retryable = Retryable.builder() - .withHiveConf(hiveConf) - .withRetryOnException(IOException.class).build(); - try { - retryable.executeCallable((Callable) () -> { - try { - writer.write(line.getBytes(StandardCharsets.UTF_8)); - } catch (IOException e) { - writer = openWriterAppendMode(); - throw e; - } - return null; - }); - } catch (Exception e) { - throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); - } - } - - private OutputStream openWriterAppendMode() { - try { - // not sure if the exception was due to a incorrect state within the writer hence closing it - close(); - return FileSystem.get(hiveConf).append(writePath); - } catch (IOException e1) { - String message = "there was an error to open the file {} in append mode"; - LOG.error(message, writePath.toString(), e1); - throw new IllegalStateException(message, e1); - } - } - - @Override - public void close() throws IOException { - if (writer != null) { - writer.close(); - } - } - } - - public static class Reader { - private static Logger LOG = LoggerFactory.getLogger(Reader.class); - private final HiveConf hiveConf; - private final Path rootReplLoadPath; - private final boolean isIncrementalPhase; - - public Reader(HiveConf conf, Path rootReplLoadPath, boolean isIncrementalPhase) { - this.hiveConf = conf; - this.rootReplLoadPath = rootReplLoadPath; - this.isIncrementalPhase = isIncrementalPhase; - } - - /** - * currently we only support dump/load of single db and the db Dump location cannot be inferred from - * the incoming dbNameOfPattern value since the load db name can be different from the target db Name - * hence traverse 1 level down from rootReplLoadPath to look for the file providing the hdfs locations. - */ - public Set sourceLocationsToCopy() throws IOException { - if (isIncrementalPhase) { - return sourceLocationsToCopy(new Path(rootReplLoadPath, FILE_NAME)); - } - - // this is bootstrap load path - Set locationsToCopy = new HashSet<>(); - FileSystem fileSystem = rootReplLoadPath.getFileSystem(hiveConf); - FileStatus[] fileStatuses = fileSystem.listStatus(rootReplLoadPath); - for (FileStatus next : fileStatuses) { - if (next.isDirectory()) { - Path externalTableInfoPath = new Path(next.getPath(), FILE_NAME); - locationsToCopy.addAll(sourceLocationsToCopy(externalTableInfoPath)); - } - } - return locationsToCopy; - } - - private BufferedReader reader(FileSystem fs, Path externalTableInfo) throws IOException { - InputStreamReader in = new InputStreamReader(fs.open(externalTableInfo), StandardCharsets.UTF_8); - return new BufferedReader(in); - } - - /** - * The SET of source locations should never be created based on the table Name in - * {@link #FILE_NAME} since there can be multiple entries for the same table in case the table is - * partitioned and the partitions are added by providing a separate Location for that partition, - * different than the table location. - */ - private Set sourceLocationsToCopy(Path externalTableInfo) throws IOException { - Set locationsToCopy = new HashSet<>(); - FileSystem fileSystem = externalTableInfo.getFileSystem(hiveConf); - if (!fileSystem.exists(externalTableInfo)) { - return locationsToCopy; - } - Retryable retryable = Retryable.builder() - .withHiveConf(hiveConf) - .withRetryOnException(IOException.class).build(); - try { - return retryable.executeCallable(() -> { - BufferedReader reader = null; - try { - reader = reader(fileSystem, externalTableInfo); - for (String line = reader.readLine(); line != null; line = reader.readLine()) { - String[] splits = line.split(FIELD_SEPARATOR); - locationsToCopy - .add(new String(Base64.getDecoder().decode(splits[1]), StandardCharsets.UTF_8)); - } - return locationsToCopy; - } finally { - closeQuietly(reader); - } - }); - } catch (Exception e) { - throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); - } - } - - private static void closeQuietly(BufferedReader reader) { - try { - if (reader != null) { - reader.close(); - } - } catch (IOException e) { - LOG.debug("error while closing reader ", e); - } - } - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 03b43371c3..f37be53085 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -362,7 +362,7 @@ private void addLazyDataCopyTask(TaskTracker loadTaskTracker) throws IOException if (dataCopyAtLoad) { if (work.getExternalTableDataCopyItr() == null) { Path extTableBackingFile = new Path(work.dumpDirectory, EximUtil.FILE_LIST_EXTERNAL); - try(FileList fileList = new FileList(extTableBackingFile, 0, conf)) { + try(FileList fileList = new FileList(extTableBackingFile, conf)) { work.setExternalTableDataCopyItr(fileList); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index a52dac27b4..daffce088c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; @@ -30,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; @@ -41,10 +43,12 @@ import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; @Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, @@ -192,18 +196,40 @@ public Long getDumpExecutionId() { return dumpExecutionId; } - public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) { + public List> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) throws IOException { if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) { return Collections.emptyList(); } List> tasks = new ArrayList<>(); - while (externalTableDataCopyItr.hasNext() && tracker.canAddMoreTasks()) { - DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, (new Path(dumpDirectory).getParent()).toString()); - dirCopyWork.loadFromString(externalTableDataCopyItr.next()); - Task task = TaskFactory.get(dirCopyWork, conf); - tasks.add(task); - tracker.addTask(task); - LOG.debug("Added task for {}", dirCopyWork); + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(UncheckedIOException.class).build(); + try { + retryable.executeCallable((Callable) ()-> { + try{ + int numEntriesToSkip = tasks == null ? 0 : tasks.size(); + while (externalTableDataCopyItr.hasNext() && tracker.canAddMoreTasks()) { + if(numEntriesToSkip > 0) { + //skip entries added in the previous attempts of this retryable block + externalTableDataCopyItr.next(); + numEntriesToSkip--; + continue; + } + DirCopyWork dirCopyWork = new DirCopyWork(metricCollector, (new Path(dumpDirectory).getParent()).toString()); + dirCopyWork.loadFromString(externalTableDataCopyItr.next()); + Task task = TaskFactory.get(dirCopyWork, conf); + tasks.add(task); + tracker.addTask(task); + LOG.debug("Added task for {}", dirCopyWork); + } + } catch (UncheckedIOException e) { + LOG.error("Reading entry for data copy failed for external tables, attempting retry.", e); + throw e; + } + return null; + }); + } catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); } LOG.info("Added total {} tasks for external table locations copy.", tasks.size()); return tasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index 4cf316a42c..363ba1fbd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -124,7 +124,7 @@ public boolean hasNext() { LocatedFileStatus next = remoteIterator.next(); // we want to skip this file, this also means there cant be a table with name represented // by constant ReplExternalTables.FILE_NAME or ReplUtils.REPL_TABLE_LIST_DIR_NAME (_tables) - if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME) || + if(next.getPath().toString().endsWith(EximUtil.FILE_LIST_EXTERNAL) || next.getPath().toString().endsWith(ReplUtils.REPL_TABLE_LIST_DIR_NAME)) { continue; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java index 50c621b782..257b2956f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java @@ -18,158 +18,225 @@ package org.apache.hadoop.hive.ql.exec.repl.util; -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Callable; - -/** - * A file backed list of Strings which is in-memory till the threshold. - */ public class FileList implements AutoCloseable, Iterator { private static final Logger LOG = LoggerFactory.getLogger(FileList.class); - private static int fileListStreamerID = 0; - private static final String FILE_LIST_STREAMER_PREFIX = "file-list-streamer-"; - - private LinkedBlockingQueue cache; - private volatile boolean thresholdHit = false; - private int thresholdPoint; - private float thresholdFactor = 0.9f; - private Path backingFile; - private FileListStreamer fileListStreamer; - private String nextElement; - private boolean noMoreElement; + private final Path backingFile; + private String nextElement = null; + private String lastReadElement = null; private HiveConf conf; + private volatile boolean abortOperation = false; + private volatile boolean retryMode; private BufferedReader backingFileReader; + private volatile FSDataOutputStream backingFileWriter; - - public FileList(Path backingFile, int cacheSize, HiveConf conf) { + public FileList(Path backingFile, HiveConf conf) { this.backingFile = backingFile; this.conf = conf; - if (cacheSize > 0) { - // Cache size must be > 0 for this list to be used for the write operation. - this.cache = new LinkedBlockingQueue<>(cacheSize); - fileListStreamer = new FileListStreamer(cache, backingFile, conf); - thresholdPoint = getThreshold(cacheSize); - LOG.debug("File list backed by {} can be used for write operation.", backingFile); + this.retryMode = false; + } + + public void add(String entry) throws IOException { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY)) { + writeWithRetry(entry); } else { - thresholdHit = true; + writeEntry(entry); } } - @VisibleForTesting - FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue cache, HiveConf conf) { - this.backingFile = backingFile; - this.fileListStreamer = fileListStreamer; - this.cache = cache; - this.conf = conf; - thresholdPoint = getThreshold(cache.remainingCapacity()); + private synchronized void writeEntry(String entry) throws IOException { + //retry only during creating the file, no retry during writes + if (backingFileWriter == null) { + try { + Retryable retryable = buildRetryable(); + retryable.executeCallable((Callable) () -> { + if(this.abortOperation) { + return null; + } + backingFileWriter = getWriterCreateMode(); + return null; + }); + } catch (Exception e) { + this.abortOperation = true; + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); + } + } + if(this.abortOperation) { + return; + } + try { + backingFileWriter.writeBytes(getEntryWithNewline(entry)); + LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); + } catch (IOException e) { + this.abortOperation = true; + LOG.error("Writing entry {} to file list {} failed.", entry, backingFile, e); + throw e; + } } - /** - * Only add operation is safe for concurrent operations. - */ - public void add(String entry) throws SemanticException { - if (thresholdHit && !fileListStreamer.isAlive()) { - throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString()); + private synchronized void writeWithRetry(String entry) throws IOException { + Retryable retryable = buildRetryable(); + try { + retryable.executeCallable((Callable) () -> { + if (this.abortOperation) { + return null; + } + try { + if (backingFileWriter == null) { + backingFileWriter = initWriter(); + } + backingFileWriter.writeBytes(getEntryWithNewline(entry)); + backingFileWriter.hflush(); + LOG.info("Writing entry {} to file list backed by {}", entry, backingFile); + } catch (IOException e) { + LOG.error("Writing entry {} to file list {} failed, attempting retry.", entry, backingFile, e); + this.retryMode = true; + close(); + throw e; + } + return null; + }); + } catch (Exception e) { + this.abortOperation = true; + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage())); } + } + + Retryable buildRetryable() { + return Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); + } + + // Return the entry ensuring it ends with newline. + private String getEntryWithNewline(String entry) { + return new StringWriter() + .append(entry) + .append(System.lineSeparator()) + .toString(); + } + + FSDataOutputStream initWriter() throws IOException { + if(shouldAppend()) { + return getWriterAppendMode(); // append in retry-mode if file has been created already + } + else { + return getWriterCreateMode(); + } + } + + private boolean shouldAppend() throws IOException { + return backingFile.getFileSystem(conf).exists(backingFile) && this.retryMode; + } + + FSDataOutputStream getWriterCreateMode() throws IOException { try { - cache.put(entry); - } catch (InterruptedException e) { - throw new SemanticException(e); + return backingFile.getFileSystem(conf).create(backingFile); + } catch (IOException e) { + LOG.error("Error opening {} in append mode", backingFile, e); + throw e; } - if (!thresholdHit && cache.size() >= thresholdPoint) { - initStoreToFile(cache.size()); + } + + FSDataOutputStream getWriterAppendMode() throws IOException { + try { + return backingFile.getFileSystem(conf).append(backingFile); + } catch (IOException e) { + LOG.error("Error creating file {}", backingFile, e); + throw e; } } @Override public boolean hasNext() { - if (!thresholdHit) { - return (cache != null && !cache.isEmpty()); - } - if (nextElement != null) { + /* + We assume that every add operation either adds an entry completely or doesn't add at all. + If this assumption changes then in the following check we should check for incomplete entries. + We remove duplicate entries assuming they are only written consecutively. + */ + if (nextElement != null && !nextElement.equals(lastReadElement)) { return true; + } else { + try { + do { + nextElement = readNextLine(); + if(nextElement != null && !nextElement.equals(lastReadElement)) { + return true; + } + } while (nextElement != null); + return false; + } catch (IOException e) { + nextElement = null; + lastReadElement = null; + backingFileReader = null; + throw new UncheckedIOException(e); + } } - if (noMoreElement) { - return false; - } - nextElement = readNextLine(); - if (nextElement == null) { - noMoreElement = true; - } - return !noMoreElement; } @Override public String next() { - if (!hasNext()) { + if ((nextElement == null || nextElement.equals(lastReadElement)) && !hasNext()) { throw new NoSuchElementException("No more element in the list backed by " + backingFile); } - String retVal = nextElement; + lastReadElement = nextElement; nextElement = null; - return thresholdHit ? retVal : cache.poll(); - } - - private synchronized void initStoreToFile(int cacheSize) { - if (!thresholdHit) { - fileListStreamer.setName(getNextID()); - fileListStreamer.setDaemon(true); - fileListStreamer.start(); - thresholdHit = true; - LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize); - } + return lastReadElement; } - private String readNextLine() { - String nextElement = null; - try { + private String readNextLine() throws IOException { + try{ + String nextElement; if (backingFileReader == null) { - FileSystem fs = FileSystem.get(backingFile.toUri(), conf); - if (fs.exists(backingFile)) { - backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile))); + FileSystem fs = backingFile.getFileSystem(conf); + if (!fs.exists(backingFile)) { + return null; } + backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile))); } - nextElement = (backingFileReader == null) ? null : backingFileReader.readLine(); + nextElement = backingFileReader.readLine(); + return nextElement; } catch (IOException e) { - LOG.error("Unable to read list from backing file " + backingFile, e); + LOG.error("Exception while reading file {}.", backingFile, e); + close(); + throw e; } - return nextElement; } @Override public void close() throws IOException { - if (thresholdHit && fileListStreamer != null) { - fileListStreamer.close(); - } - if (backingFileReader != null) { - backingFileReader.close(); - } - LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit); - } - - private static String getNextID() { - if (Integer.MAX_VALUE == fileListStreamerID) { - //reset the counter - fileListStreamerID = 0; + try { + if (backingFileReader != null) { + backingFileReader.close(); + } + if (backingFileWriter != null) { + backingFileWriter.close(); + } + LOG.info("Completed close for File List backed by:{}", backingFile); + } finally { + if(backingFileReader != null) { + backingFileReader = null; + } + if(backingFileWriter != null) { + backingFileWriter = null; + } } - fileListStreamerID++; - return FILE_LIST_STREAMER_PREFIX + fileListStreamerID; - } - - public int getThreshold(int cacheSize) { - boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); - return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor); } -} +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java deleted file mode 100644 index 36fd0555f8..0000000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.repl.util; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedWriter; -import java.io.Closeable; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Streams the entries from a cache to the backed file. - */ - -public class FileListStreamer extends Thread implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class); - private static final long TIMEOUT_IN_SECS = 5L; - private volatile boolean signalTostop; - private LinkedBlockingQueue cache; - private Path backingFile; - private Configuration conf; - private BufferedWriter backingFileWriter; - private volatile boolean valid = true; - private final Object COMPLETION_LOCK = new Object(); - private volatile boolean completed = false; - - public FileListStreamer(LinkedBlockingQueue cache, Path backingFile, Configuration conf) { - this.cache = cache; - this.backingFile = backingFile; - this.conf = conf; - } - - BufferedWriter lazyInitWriter() throws IOException { - FileSystem fs = FileSystem.get(backingFile.toUri(), conf); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile))); - LOG.info("Initialized a file based store to save a list at: {}", backingFile); - return writer; - } - - public boolean isValid() { - return valid; - } - - // Blocks for remaining entries to be flushed to file. - @Override - public void close() throws IOException { - signalTostop = true; - synchronized (COMPLETION_LOCK) { - while (motiveToWait()) { - try { - COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS)); - } catch (InterruptedException e) { - // no-op - } - } - } - if (!isValid()) { - throw new IOException("File list is not in a valid state:" + backingFile); - } - } - - private boolean motiveToWait() { - return !completed && valid; - } - - @Override - public void run() { - try { - backingFileWriter = lazyInitWriter(); - } catch (IOException e) { - valid = false; - throw new RuntimeException("Unable to initialize the file list streamer", e); - } - boolean exThrown = false; - while (!exThrown && (!signalTostop || !cache.isEmpty())) { - try { - String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS); - if (nextEntry != null) { - backingFileWriter.write(nextEntry); - backingFileWriter.newLine(); - LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile); - } - } catch (Exception iEx) { - if (!(iEx instanceof InterruptedException)) { - // not draining any more. Inform the producer to avoid OOM. - valid = false; - LOG.error("Exception while saving the list to file " + backingFile, iEx); - exThrown = true; - } - } - } - try{ - closeBackingFile(); - completed = true; - } finally { - synchronized (COMPLETION_LOCK) { - COMPLETION_LOCK.notify(); - } - } - LOG.info("Completed the file list streamer backed by: {}", backingFile); - } - - private void closeBackingFile() { - try { - backingFileWriter.close(); - LOG.debug("Closed the file list backing file: {}", backingFile); - } catch (IOException e) { - LOG.error("Exception while closing the file list backing file", e); - valid = false; - } - } - - @VisibleForTesting - boolean isInitialized() { - return backingFileWriter != null; - } -} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 6681102366..ffab49ab3d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -50,7 +50,6 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.when; import static org.powermock.api.mockito.PowerMockito.whenNew; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; @RunWith(PowerMockRunner.class) @PrepareForTest({ Utils.class, ReplDumpTask.class}) @@ -126,9 +125,6 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw when(conf.getLong("hive.repl.last.repl.id", -1L)).thenReturn(1L); when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false); when(HiveConf.getVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h"); - when(conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE)).thenReturn(10000); - - whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class)); whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class)); ReplDumpTask task = new StubReplDumpTask() { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java index 67000a0cb3..dc6319939d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java @@ -18,147 +18,266 @@ package org.apache.hadoop.hive.ql.exec.repl.util; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; -import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.LoggerFactory; -import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - - /** * Tests the File List implementation. */ -@RunWith(PowerMockRunner.class) +@RunWith(MockitoJUnitRunner.class) @PrepareForTest({LoggerFactory.class}) public class TestFileList { - @Mock - private BufferedWriter bufferedWriter; - - - @Test - public void testNoStreaming() throws Exception { - Object tuple[] = setupAndGetTuple(100, false); - FileList fileList = (FileList) tuple[0]; - FileListStreamer fileListStreamer = (FileListStreamer) tuple[1]; - fileList.add("Entry1"); - fileList.add("Entry2"); - assertFalse(isStreamingToFile(fileListStreamer)); - } + HiveConf conf = new HiveConf(); + private FSDataOutputStream outStream; + private FSDataOutputStream testFileStream; + final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestFileList.class.getCanonicalName() + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + private Exception testException = new IOException("test"); @Test - public void testAlwaysStreaming() throws Exception { - Object tuple[] = setupAndGetTuple(100, true); - FileList fileList = (FileList) tuple[0]; - FileListStreamer fileListStreamer = (FileListStreamer) tuple[1]; - assertFalse(fileListStreamer.isInitialized()); - fileList.add("Entry1"); - waitForStreamingInitialization(fileListStreamer); - assertTrue(isStreamingToFile(fileListStreamer)); - fileList.close(); - waitForStreamingClosure(fileListStreamer); - } + public void testConcurrentAdd() throws Exception { + FileList fileList = setupFileList(); + int numOfEntries = 1000; + int numOfThreads = 10; + ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads); - @Test - public void testStreaminOnCacheHit() throws Exception { - Object tuple[] = setupAndGetTuple(5, false); - FileList fileList = (FileList) tuple[0]; - FileListStreamer fileListStreamer = (FileListStreamer) tuple[1]; - fileList.add("Entry1"); - fileList.add("Entry2"); - fileList.add("Entry3"); - Thread.sleep(5000L); - assertFalse(fileListStreamer.isInitialized()); - fileList.add("Entry4"); - fileList.add("Entry5"); - waitForStreamingInitialization(fileListStreamer); + for (int i=1; i<=numOfEntries; i++) { + executorService.submit(() -> { + try { + fileList.add("someEntry"); + } catch (IOException e) { + throw new RuntimeException("Unbale to add to file list."); + } + }); + } + executorService.awaitTermination(1, TimeUnit.MINUTES); fileList.close(); - waitForStreamingClosure(fileListStreamer); + ArgumentCaptor entryArgs = ArgumentCaptor.forClass(String.class); + Mockito.verify(testFileStream, Mockito.times(numOfEntries)).writeBytes(entryArgs.capture()); } @Test - public void testConcurrentAdd() throws Exception { - Object tuple[] = setupAndGetTuple(100, false); - FileList fileList = (FileList) tuple[0]; - FileListStreamer fileListStreamer = (FileListStreamer) tuple[1]; + public void testConcurrentAddWithAbort() throws Exception { + FileList fileList = setupFileList(false, false, false); int numOfEntries = 1000; int numOfThreads = 10; ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads); + final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED + .format(testException.getMessage()); for (int i=1; i<=numOfEntries; i++) { executorService.submit(() -> { try { fileList.add("someEntry"); - } catch (SemanticException e) { - throw new RuntimeException("Unbale to add to file list."); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains(retryExhaustedMsg)); } }); } executorService.awaitTermination(1, TimeUnit.MINUTES); - waitForStreamingInitialization(fileListStreamer); fileList.close(); - waitForStreamingClosure(fileListStreamer); ArgumentCaptor entryArgs = ArgumentCaptor.forClass(String.class); - Mockito.verify(bufferedWriter, Mockito.times(numOfEntries)).write(entryArgs.capture()); + //retry exhausted should be encountered by the first thread, so the other threads do not write. + Mockito.verify(outStream, Mockito.times(1)).writeBytes(entryArgs.capture()); } - private void waitForStreamingInitialization(FileListStreamer fileListStreamer) throws InterruptedException { - long sleepTime = 1000L; - int iter = 0; - while (!fileListStreamer.isInitialized()) { - Thread.sleep(sleepTime); - iter++; - if (iter == 5) { - throw new IllegalStateException("File Streamer not initialized till 5s."); - } + @Test + public void testWriteRetryCreateFailure() throws Exception { + String testEntry = "someEntry"; + boolean retryOnCreate = true; + FileList fileList = setupFileList(retryOnCreate); + final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED + .format(testException.getMessage()); + + try { + fileList.add(testEntry); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains(retryExhaustedMsg)); + } + + //the create keeps failing, so create should be called at least twice, + //writes and appends do not happen + Mockito.verify(fileList, Mockito.atLeast(2)).getWriterCreateMode(); + Mockito.verify(fileList, Mockito.times(0)).getWriterAppendMode(); + } + + @Test + public void testWriteNoRetry() throws Exception { + String testEntry = "someEntry"; + boolean retryOnCreate = false, retryOnWrite = false; + FileList fileList = setupFileList(retryOnCreate, retryOnWrite); + final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED + .format(testException.getMessage()); + + try { + fileList.add(testEntry); + } catch (IOException e) { + Assert.assertFalse(e.getMessage().contains(retryExhaustedMsg)); + Assert.assertTrue(e.getMessage().contains("test")); } + + //the first write fails and no retries are made + Mockito.verify(fileList, Mockito.times(1)).getWriterCreateMode(); + Mockito.verify(outStream, Mockito.times(1)).writeBytes(Mockito.anyString()); + Mockito.verify(fileList, Mockito.times(0)).getWriterAppendMode(); } - private void waitForStreamingClosure(FileListStreamer fileListStreamer) throws InterruptedException { - long sleepTime = 1000L; - int iter = 0; - while (!isStreamingClosedProperly(fileListStreamer)) { - Thread.sleep(sleepTime); - iter++; - if (iter == 5) { - throw new IllegalStateException("File Streamer not getting closed till 5s."); + @Test + public void testReadWithDuplicateEntries() throws Exception { + conf = new HiveConf(); + String testEntry = "someEntry"; + int numUniqueEntries = 100; + Path testFilePath = new Path(new Path(TEST_DATA_DIR), "testFile"); + FileList fileList = new FileList(testFilePath, conf); + + for (int i = 1; i <= numUniqueEntries; i++) { + String currentUniqueEntry = testEntry + Integer.valueOf(i); + for (int duplicateFactor = 0; duplicateFactor < i; duplicateFactor++) { + fileList.add(currentUniqueEntry); } } + fileList.close(); + + int currentCount = 0; + while (fileList.hasNext()) { + String entry = fileList.next(); + Assert.assertEquals(testEntry + Integer.valueOf(currentCount + 1), entry); + currentCount++; + } + Assert.assertEquals(currentCount, numUniqueEntries); } - private Object[] setupAndGetTuple(int cacheSize, boolean lazyDataCopy) throws Exception { - HiveConf hiveConf = Mockito.mock(HiveConf.class); - Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET)).thenReturn(lazyDataCopy); - Path backingFile = new Path("/tmp/backingFile"); - LinkedBlockingQueue cache = new LinkedBlockingQueue<>(cacheSize); - FileListStreamer fileListStreamer = Mockito.spy(new FileListStreamer(cache, backingFile, hiveConf)); - FileList fileList = new FileList(backingFile, fileListStreamer, cache, hiveConf); - Mockito.doReturn(bufferedWriter).when(fileListStreamer).lazyInitWriter(); - Object[] tuple = new Object[] {fileList, fileListStreamer}; - return tuple; + @Test + public void testReadWithAllDistinctEntries() throws Exception { + conf = new HiveConf(); + String testEntry = "someEntry"; + int numUniqueEntries = 100; + Path testFilePath = new Path(new Path(TEST_DATA_DIR), "testFile"); + FileList fileList = new FileList(testFilePath, conf); + + for (int i = 1; i <= numUniqueEntries; i++) { + String currentUniqueEntry = testEntry + Integer.valueOf(i); + fileList.add(currentUniqueEntry); + } + fileList.close(); + + int currentCount = 0; + while (fileList.hasNext()) { + String entry = fileList.next(); + Assert.assertEquals(testEntry + Integer.valueOf(currentCount + 1), entry); + currentCount++; + } + Assert.assertEquals(currentCount, numUniqueEntries); } - private boolean isStreamingToFile(FileListStreamer fileListStreamer) { - return fileListStreamer.isInitialized() && fileListStreamer.isAlive(); + @Test + public void testWriteIntermediateRetry() throws Exception { + String testEntry = "someEntry"; + boolean retryOnCreate = false; // create passes, write operation keeps failing + FileList fileList = setupFileList(retryOnCreate); + final String retryExhaustedMsg = ErrorMsg.REPL_RETRY_EXHAUSTED + .format(testException.getMessage()); + + try{ + fileList.add(testEntry); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains(retryExhaustedMsg)); + } + + //file-creation done once + Mockito.verify(fileList, Mockito.times(1)).getWriterCreateMode(); + //writes fail after creation, subsequent attempts should only call append, verify 2 such attempts + Mockito.verify(fileList, Mockito.atLeast(2)).getWriterAppendMode(); + Mockito.verify(outStream, Mockito.atLeast(2)).writeBytes(Mockito.anyString()); } - private boolean isStreamingClosedProperly(FileListStreamer fileListStreamer) { - return fileListStreamer.isInitialized() && !fileListStreamer.isAlive() && fileListStreamer.isValid(); + private FileList setupFileList(boolean... retryParams) throws Exception { + HiveConf hiveConf = Mockito.mock(HiveConf.class); + FileSystem mockFs = Mockito.mock(FileSystem.class); + Path backingFile = Mockito.spy(new Path("/tmp/backingFile")); + FileList fileList = Mockito.spy(new FileList(backingFile, hiveConf)); + outStream = Mockito.spy(new FSDataOutputStream(null, null)); + Retryable retryable = Retryable.builder() + .withTotalDuration(60) + .withInitialDelay(1) + .withBackoff(1.0) + .withRetryOnException(IOException.class).build(); + + if(retryParams.length == 0) { + //setup for normal flow, without failures + Path noRetryPath = new Path(new Path(TEST_DATA_DIR), "noRetry"); + testFileStream = Mockito.spy(noRetryPath.getFileSystem(conf).create(noRetryPath)); + Mockito.doReturn(retryable) + .when(fileList).buildRetryable(); + Mockito.doReturn(true) + .when(hiveConf).getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY); + Mockito.doReturn(testFileStream).when(fileList).initWriter(); + } + else if (retryParams.length == 1) { + //setup for retries + Mockito.doReturn(true) + .when(hiveConf).getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY); + Mockito.doReturn(retryable) + .when(fileList).buildRetryable(); + Mockito.doReturn(mockFs) + .when(backingFile).getFileSystem(hiveConf); + + if(retryParams[0]) { + //setup for retry because of create-failure + Mockito.doReturn(false) + .when(mockFs).exists(backingFile); + Mockito.doThrow(testException) + .when(fileList).getWriterCreateMode(); + } + else { + //setup for retry because of failure during writes + Mockito.when(mockFs.exists(backingFile)) + .thenReturn(false) + .thenReturn(true); + Mockito.doReturn(outStream) + .when(fileList).getWriterAppendMode(); + Mockito.doReturn(outStream) + .when(fileList).getWriterCreateMode(); + Mockito.doThrow(testException) + .when(outStream).writeBytes(Mockito.anyString()); + } + } else if (retryParams.length == 2) { + //setup for failure but no retry + Mockito.doReturn(false) + .when(hiveConf).getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY); + Mockito.doReturn(outStream) + .when(fileList).getWriterCreateMode(); + Mockito.doThrow(testException) + .when(outStream).writeBytes(Mockito.anyString()); + } else if (retryParams.length == 3) { + //setup for abort case + Mockito.doReturn(true) + .when(hiveConf) + .getBoolVar(HiveConf.ConfVars.REPL_COPY_ITERATOR_RETRY); + Mockito.doReturn(outStream) + .when(fileList).initWriter(); + } + return fileList; } }