diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 39d876802a..7d2869004e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -66,7 +66,9 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -395,7 +397,8 @@ public boolean validate(Task task) { private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tuple tuple) throws Throwable { HiveConf confTemp = new HiveConf(); confTemp.set("hive.repl.enable.move.optimization", "true"); - ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, + Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), replicadb, null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), Collections.emptyList()); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); @@ -1362,6 +1365,70 @@ public void testDatabaseAlters() throws IOException { } } + @Test + public void testBootstrapWithDataInDumpDir() throws IOException { + String testName = "bootstrapLoadWithData"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_dupe"; + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + String[] unptn_data_1 = new String[] { "eleven", "twelve" }; + String[] unptn_data_2 = new String[] { "thirteen", "fourteen", "fifteen"}; + String[] unptn_allData = new String[] { "eleven", "twelve","thirteen", "fourteen", "fifteen"}; + String[] ptn_data_1 = new String[] { "one", "two", "three" }; + String[] ptn_data_2 = new String[] { "four", "five" }; + String[] empty = new String[] {}; + String unptned_fileName_1 = testName + "_unptn_1"; + String unptned_fileName_2 = testName + "_unptn_2"; + String ptned_fileName_1 = testName + "_ptn_1"; + String ptned_fileName_2 = testName + "_ptn_2"; + + String unptn_locn_1= new Path(TEST_PATH, unptned_fileName_1).toUri().getPath(); + String unptn_locn_2 = new Path(TEST_PATH, unptned_fileName_2).toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, ptned_fileName_1).toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, ptned_fileName_2).toUri().getPath(); + createTestDataFile(unptn_locn_1, unptn_data_1); + createTestDataFile(unptn_locn_2, unptn_data_2); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + verifySetup("SELECT * from " + dbName + ".unptned", empty, driverMirror); + verifySetup("SELECT * from " + dbName + ".ptned", empty, driverMirror); + run("LOAD DATA LOCAL INPATH '" + unptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + run("LOAD DATA LOCAL INPATH '" + unptn_locn_2 + "' INTO TABLE " + dbName + ".unptned", driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + Tuple dump = replDumpDb(dbName, null, null, null); + Path path = new Path(System.getProperty("test.warehouse.dir","")); + String tableRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "unptned"; + Path srcFileLocation = new Path(path, tableRelativeSrcPath + File.separator + unptned_fileName_1); + String tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + File.separator + + "unptned" + File.separator + EximUtil.DATA_PATH_NAME +File.separator + unptned_fileName_1; + Path tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath); + //A file in table at src location should be copied to $dumplocation/hive///data/ + verifyChecksum(srcFileLocation, tgtFileLocation, true); + srcFileLocation = new Path(path, tableRelativeSrcPath + File.separator + unptned_fileName_2); + verifyChecksum(srcFileLocation, tgtFileLocation, false); + + String partitionRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "ptned" + File.separator + "b=1"; + srcFileLocation = new Path(path, partitionRelativeSrcPath + File.separator + ptned_fileName_1); + tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + + File.separator + "ptned" + File.separator + "b=1" + File.separator + + EximUtil.DATA_PATH_NAME +File.separator + ptned_fileName_1; + tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath); + //A partitioned file in table at src location should be copied to + // $dumplocation/hive//
//data/ + verifyChecksum(srcFileLocation, tgtFileLocation, true); + partitionRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "ptned" + File.separator + "b=2"; + srcFileLocation = new Path(path, partitionRelativeSrcPath + File.separator + ptned_fileName_2); + loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId); + verifyChecksum(srcFileLocation, tgtFileLocation, false); + verifySetup("SELECT * from " + replDbName + ".unptned", unptn_allData, driver); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror); + } + @Test public void testIncrementalLoad() throws IOException { String testName = "incrementalLoad"; @@ -3548,6 +3615,14 @@ private void verifyResults(String[] data, IDriver myDriver) throws IOException { } } + private void verifyChecksum(Path sourceFilePath, Path targetFilePath, boolean shouldMatch) throws IOException { + FileSystem srcFS = sourceFilePath.getFileSystem(hconf); + FileSystem tgtFS = targetFilePath.getFileSystem(hconf); + if (shouldMatch) { + assertTrue(srcFS.getFileChecksum(sourceFilePath).equals(tgtFS.getFileChecksum(targetFilePath))); + } else + assertFalse(srcFS.getFileChecksum(sourceFilePath).equals(tgtFS.getFileChecksum(targetFilePath))); + } private List getOutput(IDriver myDriver) throws IOException { List results = new ArrayList<>(); myDriver.getResults(results); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index ebc590df05..29a9689f7f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -53,9 +53,7 @@ public int execute() { work.acidPostProcess(db); TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf, work.getMmContext()); - if (!tableExport.write()) { - throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg()); - } + tableExport.write(true); } catch (Exception e) { LOG.error("failed", e); setException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 470357af49..5718a3466b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -255,7 +255,7 @@ public int execute() { return 2; } // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles); + new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles, fromPath); // If a file is copied from CM path, then need to rename them using original source file name // This is needed to avoid having duplicate files in target if same event is applied twice @@ -324,10 +324,18 @@ public String getName() { public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean isAutoPurge, boolean needRecycle, boolean copyToMigratedTxnTable) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, + copyToMigratedTxnTable, true); + } + + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean isAutoPurge, boolean needRecycle, + boolean copyToMigratedTxnTable, boolean readSourceAsFileList) { Task copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + rcwork.setReadSrcAsFilesList(readSourceAsFileList); if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION) || copyToMigratedTxnTable)) { rcwork.setDeleteDestIfExist(true); rcwork.setAutoPurge(isAutoPurge); @@ -339,15 +347,8 @@ public String getName() { // replace events getting replayed in the first incremental load. rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() && !replicationSpec.isReplace()); LOG.debug("ReplCopyTask:\trcwork"); - if (replicationSpec.isLazy()) { - LOG.debug("ReplCopyTask:\tlazy"); - rcwork.setReadSrcAsFilesList(true); - - // It is assumed isLazy flag is set only for REPL LOAD flow. - // IMPORT always do deep copy. So, distCpDoAsUser will be null by default in ReplCopyWork. - String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); - rcwork.setDistCpDoAsUser(distCpDoAsUser); - } + String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + rcwork.setDistCpDoAsUser(distCpDoAsUser); copyTask = TaskFactory.get(rcwork, conf); } else { LOG.debug("ReplCopyTask:\tcwork"); @@ -360,4 +361,8 @@ public String getName() { HiveConf conf) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false); } + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean readSourceAsFileList) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false, readSourceAsFileList); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 977abb74cc..42b27b186c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -72,6 +74,7 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -119,18 +122,19 @@ public String getName() { public int execute() { try { Hive hiveDb = getHive(); - Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir()); - DumpMetaData dmd = new DumpMetaData(dumpRoot, conf); + Path dumpBaseDir = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir()); + Path hiveDumpRoot = new Path(dumpBaseDir, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; if (work.isBootStrapDump()) { - lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot, hiveDb); + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } else { - lastReplId = incrementalDump(dumpRoot, dmd, cmRoot, hiveDb); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } - prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId))); + prepareReturnValues(Arrays.asList(dumpBaseDir.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { LOG.error("failed", e); setException(e); @@ -305,7 +309,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); - dumpEvent(ev, evRoot, cmRoot, hiveDb); + dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); } replLogger.endLog(lastReplId.toString()); @@ -344,8 +348,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple
tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - dumpTable(dbName, tableName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, - tableTuple); + dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); } if (tableList != null && isTableSatifiesConfig(table)) { tableList.add(tableName); @@ -385,9 +388,10 @@ private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncremen return new Path(dumpRoot, dbName); } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) throws Exception { + private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, + dumpRoot, cmRoot, db, conf, @@ -505,7 +509,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) LOG.debug("Adding table {} to external tables list", tblName); writer.dataLocationDump(tableTuple.object); } - dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, + dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. @@ -563,7 +567,7 @@ Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) return dbRoot; } - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); @@ -582,10 +586,19 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); tuple.replicationSpec.setRepl(true); - new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); - + List replPathMappings = new TableExport( + exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(false); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); + if (Utils.shouldDumpMetaDataOnly(tuple.object, conf)) { + return; + } + for (ReplPathMapping replPathMapping: replPathMappings) { + Task copyTask = ReplCopyTask.getLoadCopyTask( + tuple.replicationSpec, replPathMapping.getSrcPath(), replPathMapping.getTargetPath(), conf, false); + this.addDependentTask(copyTask); + LOG.info("Scheduled a repl copy task from [{}] to [{}]", + replPathMapping.getSrcPath(), replPathMapping.getTargetPath()); + } } private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { @@ -654,7 +667,7 @@ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveEx private ReplicationSpec getNewReplicationSpec(String evState, String objState, boolean isMetadataOnly) { - return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true, true); + return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true); } private String getNextDumpDir() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 11597740e2..05a590a189 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.HiveTableName; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -242,9 +243,9 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), - sourceWarehousePartitionLocation, + new Path(sourceWarehousePartitionLocation, EximUtil.DATA_PATH_NAME), stagingDir, - context.hiveConf + context.hiveConf, false ); Task movePartitionTask = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 65588fdbe9..82a30319b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -296,7 +296,7 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent + table.getCompleteName() + " with source location: " + dataPath.toString() + " and target location " + tgtPath.toString()); - Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); + Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, false); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java index f2c8e8fd54..76c3a8a272 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index fc7f226d77..1040f0115a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; @@ -79,6 +80,9 @@ // Root directory for dumping bootstrapped tables along with incremental events dump. public static final String INC_BOOTSTRAP_ROOT_DIR_NAME = "_bootstrap"; + // Root base directory name for hive. + public static final String REPL_HIVE_BASE_DIR = "hive"; + // Name of the directory which stores the list of tables included in the policy in case of table level replication. // One file per database, named after the db name. The directory is not created for db level replication. public static final String REPL_TABLE_LIST_DIR_NAME = "_tables"; @@ -236,7 +240,8 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { return p -> { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME) - && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); + && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) + && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); } @@ -246,7 +251,8 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) { return p -> { try { - return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); + return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) + && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index f588b0d065..e65cbf50e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -157,6 +157,40 @@ public void setOpenTxnTask(Task openTxnTask) { } } + /** + * Wrapper class for mapping replication source and target path for copying data. + */ + public static class ReplPathMapping { + private Path srcPath; + private Path tgtPath; + + public ReplPathMapping(Path srcPath, Path tgtPath) { + if (srcPath == null) { + throw new IllegalArgumentException("Source Path can not be null."); + } + this.srcPath = srcPath; + if (tgtPath == null) { + throw new IllegalArgumentException("Target Path can not be null."); + } + this.tgtPath = tgtPath; + } + + public Path getSrcPath() { + return srcPath; + } + + public void setSrcPath(Path srcPath) { + this.srcPath = srcPath; + } + + public Path getTargetPath() { + return tgtPath; + } + + public void setTargetPath(Path targetPath) { + this.tgtPath = targetPath; + } + } private EximUtil() { } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 810a4c5284..39912a0a98 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -384,6 +385,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { // Make fully qualified path for further use. loadPath = fs.makeQualified(loadPath); + // Load path points to hive load path by default. + loadPath = new Path(loadPath, ReplUtils.REPL_HIVE_BASE_DIR); if (!fs.exists(loadPath)) { // supposed dump path does not exist. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 99b09e5ea9..13e4a8cbee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -42,7 +42,6 @@ private String eventId = null; private String currStateId = null; private boolean isNoop = false; - private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in. private boolean isReplace = true; // default is that the import mode is insert overwrite private String validWriteIdList = null; // WriteIds snapshot for replicating ACID/MM tables. //TxnIds snapshot @@ -60,7 +59,6 @@ EVENT_ID("repl.event.id"), CURR_STATE_ID(ReplConst.REPL_TARGET_TABLE_PROPERTY), NOOP("repl.noop"), - LAZY("repl.lazy"), IS_REPLACE("repl.is.replace"), VALID_WRITEID_LIST("repl.valid.writeid.list"), VALID_TXN_LIST("repl.valid.txnid.list") @@ -117,18 +115,17 @@ public ReplicationSpec(){ } public ReplicationSpec(String fromId, String toId) { - this(true, false, fromId, toId, false, true, false); + this(true, false, fromId, toId, false, false); } public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState, String currentReplicationState, - boolean isNoop, boolean isLazy, boolean isReplace) { + boolean isNoop, boolean isReplace) { this.isInReplicationScope = isInReplicationScope; this.isMetadataOnly = isMetadataOnly; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; this.isNoop = isNoop; - this.isLazy = isLazy; this.isReplace = isReplace; this.specType = Type.DEFAULT; } @@ -149,7 +146,6 @@ public ReplicationSpec(Function keyFetcher) { this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString()); this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString()); this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString())); - this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString())); this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString())); this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString()); this.validTxnList = keyFetcher.apply(KEY.VALID_TXN_LIST.toString()); @@ -324,20 +320,6 @@ public void setNoop(boolean isNoop) { this.isNoop = isNoop; } - /** - * @return whether or not the current replication action is set to be lazy - */ - public boolean isLazy() { - return isLazy; - } - - /** - * @param isLazy whether or not the current replication action should be lazy - */ - public void setLazy(boolean isLazy){ - this.isLazy = isLazy; - } - /** * @return the WriteIds snapshot for the current ACID/MM table being replicated */ @@ -385,8 +367,6 @@ public String get(KEY key) { return getCurrentReplicationState(); case NOOP: return String.valueOf(isNoop()); - case LAZY: - return String.valueOf(isLazy()); case IS_REPLACE: return String.valueOf(isReplace()); case VALID_WRITEID_LIST: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 73c863ed1a..0cf53c8358 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -72,28 +72,34 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinatio // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm public void copyAndVerify(Path destRoot, - List srcFiles) throws IOException, LoginException, HiveFatalException { - Map>> map = fsToFileMap(srcFiles, destRoot); + List srcFiles, Path origSrcPtah) throws IOException, LoginException, HiveFatalException { UserGroupInformation proxyUser = getProxyUser(); + FileSystem sourceFs = origSrcPtah.getFileSystem(hiveConf); + boolean useRegularCopy = regularCopy(sourceFs, srcFiles); try { - for (Map.Entry>> entry : map.entrySet()) { - Map> destMap = entry.getValue(); - for (Map.Entry> destMapEntry : destMap.entrySet()) { - Path destination = destMapEntry.getKey(); - List fileInfoList = destMapEntry.getValue(); - // Get the file system again from cache. There is a chance that the file system stored in the map is closed. - // For instance, doCopyRetry closes the file system in case of i/o exceptions. - FileSystem sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); - boolean useRegularCopy = regularCopy(sourceFs, fileInfoList); - - if (!destinationFs.exists(destination) - && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { - LOG.error("Failed to create destination directory: " + destination); - throw new IOException("Destination directory creation failed"); + if (!useRegularCopy) { + srcFiles.clear(); + srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPtah, null)); + doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy); + } else { + Map>> map = fsToFileMap(srcFiles, destRoot); + for (Map.Entry>> entry : map.entrySet()) { + Map> destMap = entry.getValue(); + for (Map.Entry> destMapEntry : destMap.entrySet()) { + Path destination = destMapEntry.getKey(); + List fileInfoList = destMapEntry.getValue(); + // Get the file system again from cache. There is a chance that the file system stored in the map is closed. + // For instance, doCopyRetry closes the file system in case of i/o exceptions. + sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); + if (!destinationFs.exists(destination) + && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { + LOG.error("Failed to create destination directory: " + destination); + throw new IOException("Destination directory creation failed"); + } + + // Copy files with retry logic on failure or source file is dropped or changed. + doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true); } - - // Copy files with retry logic on failure or source file is dropped or changed. - doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, useRegularCopy); } } } finally { @@ -181,12 +187,12 @@ private void doCopyRetry(FileSystem sourceFs, List s continue; } Path srcPath = srcFile.getEffectivePath(); - Path destPath = new Path(destination, srcPath.getName()); - if (destinationFs.exists(destPath)) { + //Path destPath = new Path(destination, srcPath.getName()); + if (destinationFs.exists(destination)) { // If destination file is present and checksum of source mismatch, then retry copy. if (isSourceFileMismatch(sourceFs, srcFile)) { // Delete the incorrectly copied file and retry with CM path - destinationFs.delete(destPath, true); + destinationFs.delete(destination, true); srcFile.setIsUseSourcePath(false); } else { // If the retry logic is reached after copy error, then include the copied file as well. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java index 4b2812eaa7..7d641961d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java @@ -43,7 +43,6 @@ public ReplicationSpec fromMetaStore() throws HiveException { "replv2", "will-be-set", false, - true, false ); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 9e24799382..0bd57df721 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; @@ -73,8 +75,9 @@ this.callersSession = SessionState.get(); } - void write(final ReplicationSpec forReplicationSpec) throws InterruptedException, HiveException { + List write(final ReplicationSpec forReplicationSpec) throws InterruptedException, HiveException { List> futures = new LinkedList<>(); + List replCopyPathMappings = new LinkedList<>(); ExecutorService producer = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build()); futures.add(producer.submit(() -> { @@ -116,8 +119,10 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionExportDir(partitionName); new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) - .export(forReplicationSpec); + .export(); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); + replCopyPathMappings.add(new ReplPathMapping(partition.getDataLocation(), + new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME))); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } @@ -134,5 +139,6 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException } // may be drive this via configuration as well. consumer.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + return replCopyPathMappings; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 97a1dd31a7..81f377b484 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; @@ -46,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -86,20 +88,21 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.mmCtx = mmCtx; } - public boolean write() throws SemanticException { + public List write(boolean isExportTask) throws SemanticException { + List replPathMappings = Collections.emptyList(); if (tableSpec == null) { writeMetaData(null); - return true; } else if (shouldExport()) { PartitionIterable withPartitions = getPartitions(); writeMetaData(withPartitions); if (!replicationSpec.isMetadataOnly() && !(replicationSpec.isRepl() && tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE))) { - writeData(withPartitions); + replPathMappings = writeData(withPartitions); } - return true; + } else if (isExportTask) { + throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg()); } - return false; + return replPathMappings; } private PartitionIterable getPartitions() throws SemanticException { @@ -149,20 +152,23 @@ private void writeMetaData(PartitionIterable partitions) throws SemanticExceptio } } - private void writeData(PartitionIterable partitions) throws SemanticException { + private List writeData(PartitionIterable partitions) throws SemanticException { + List replCopyPathMappings = new LinkedList<>(); try { if (tableSpec.tableHandle.isPartitioned()) { if (partitions == null) { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.getTableName().getTable()); } - new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); + replCopyPathMappings = new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); + replCopyPathMappings.add(new ReplPathMapping(tableSpec.tableHandle.getDataLocation(), paths.dataExportDir())); new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) - .export(replicationSpec); + .export(); } + return replCopyPathMappings; } catch (Exception e) { throw new SemanticException(e.getMessage(), e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index b9967031cd..7fe9681e15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -17,15 +17,29 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.login.LoginException; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + abstract class AbstractEventHandler implements EventHandler { static final Logger LOG = LoggerFactory.getLogger(AbstractEventHandler.class); static final MessageEncoder jsonMessageEncoder = JSONMessageEncoder.getInstance(); @@ -71,4 +85,31 @@ public long fromEventId() { public long toEventId() { return event.getEventId(); } + + public void writeFileEntry(String dbName, Table table, String file, BufferedWriter fileListWriter, + Context withinContext) throws IOException, LoginException { + HiveConf hiveConf = withinContext.hiveConf; + String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + if (Utils.shouldDumpMetaDataOnly(table, withinContext.hiveConf)) { + Path dataPath = new Path(withinContext.dumpRoot.toString(), EximUtil.DATA_PATH_NAME); + List dataPathList = new LinkedList<>(); + String[] decodedURISplits = ReplChangeManager.decodeFileUri(file); + String srcDataFile = decodedURISplits[0]; + Path srcDataPath = new Path(srcDataFile); + dataPathList.add(srcDataPath); + FileOperations fileOperations = new FileOperations(dataPathList, dataPath, distCpDoAsUser, hiveConf, + null); + String eventTblPath = event.getEventId() + File.separator + dbName + File.separator + table.getTableName(); + String tableRelativePathExr = File.separator + table.getTableName() + File.separator; + String srcDataFileRelativePath = srcDataFile.substring(srcDataFile.indexOf(tableRelativePathExr) + + tableRelativePathExr.length()); + Path targetPath = new Path(dataPath, eventTblPath + File.separator + srcDataFileRelativePath); + fileOperations.copyOneDataPath(srcDataPath, targetPath); + String encodedTargetPath = ReplChangeManager.encodeFileUri( + targetPath.toString(), decodedURISplits[1], decodedURISplits[3]); + fileListWriter.write(encodedTargetPath + "\n"); + } else { + fileListWriter.write(file.toString() + "\n"); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 42e74b37d9..65034ea2fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -110,8 +110,7 @@ public void handle(Context withinContext) throws Exception { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { for (String file : files) { - fileListWriter.write(file); - fileListWriter.newLine(); + super.writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 7d7dc26a25..a625f94572 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -32,11 +32,14 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.fs.FileSystem; + +import javax.security.auth.login.LoginException; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; @@ -60,17 +63,19 @@ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOExc return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); } - private void writeDumpFiles(Context withinContext, Iterable files, Path dataPath) throws IOException { + private void writeDumpFiles(Table qlMdTable, Context withinContext, Iterable files, Path dataPath) + throws IOException, LoginException { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + super.writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); } } } private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, - List qlPtns, List> fileListArray) throws IOException, SemanticException { + List qlPtns, List> fileListArray) + throws IOException, SemanticException, LoginException { if (fileListArray == null || fileListArray.isEmpty()) { return; } @@ -86,17 +91,18 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met if ((null == qlPtns) || qlPtns.isEmpty()) { Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); - writeDumpFiles(withinContext, fileListArray.get(0), dataPath); + writeDumpFiles(qlMdTable, withinContext, fileListArray.get(0), dataPath); } else { for (int idx = 0; idx < qlPtns.size(); idx++) { Path dataPath = new Path(withinContext.eventRoot, qlPtns.get(idx).getName()); - writeDumpFiles(withinContext, fileListArray.get(idx), dataPath); + writeDumpFiles(qlMdTable, withinContext, fileListArray.get(idx), dataPath); } } } private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, - List qlPtns, List> fileListArray) throws IOException, SemanticException { + List qlPtns, List> fileListArray) + throws IOException, SemanticException, LoginException { Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, qlMdTable.getDbName(), qlMdTable.getTableName()); Context context = new Context(withinContext); context.setEventRoot(newPath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 355374aeb8..5d03e8b8d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -84,7 +84,7 @@ public void handle(Context withinContext) throws Exception { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + super.writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index 7d00f89a5b..74a16b41aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -36,6 +36,7 @@ DumpType dumpType(); class Context { + Path dumpRoot; Path eventRoot; final Path cmRoot; final Hive db; @@ -45,8 +46,9 @@ final ReplScope oldReplScope; private Set tablesForBootstrap; - public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, + public Context(Path eventRoot, Path dumpRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope, Set tablesForBootstrap) { + this.dumpRoot = dumpRoot; this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; @@ -58,6 +60,7 @@ public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, Replicat } public Context(Context other) { + this.dumpRoot = other.dumpRoot; this.eventRoot = other.eventRoot; this.cmRoot = other.cmRoot; this.db = other.db; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 5a18d573cf..8c83f0ff24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -97,7 +97,7 @@ public void handle(Context withinContext) throws Exception { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + super.writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index fc5419ce3f..95b262331b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStreamWriter; +import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -49,6 +50,7 @@ import static org.apache.hadoop.hive.ql.ErrorMsg.FILE_NOT_FOUND; + //TODO: this object is created once to call one method and then immediately destroyed. //So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class FileOperations { @@ -75,28 +77,15 @@ public FileOperations(List dataPathList, Path exportRootDataDir, String di exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); } - public void export(ReplicationSpec forReplicationSpec) throws Exception { - if (forReplicationSpec.isLazy()) { - exportFilesAsList(); - } else { - copyFiles(); - } - } - - /** - * This writes the actual data in the exportRootDataDir from the source. - */ - private void copyFiles() throws IOException, LoginException { + public void export() throws Exception { if (mmCtx == null) { - for (Path dataPath : dataPathList) { - copyOneDataPath(dataPath, exportRootDataDir); - } + validateSrcPathListExists(); } else { copyMmPath(); } } - private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { + public void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath); List srcPaths = new ArrayList<>(); for (FileStatus fileStatus : fileStatuses) { @@ -141,68 +130,12 @@ private void copyMmPath() throws LoginException, IOException { } } - /** - * This needs the root data directory to which the data needs to be exported to. - * The data export here is a list of files either in table/partition that are written to the _files - * in the exportRootDataDir provided. - */ - private void exportFilesAsList() throws SemanticException, IOException, LoginException { - if (dataPathList.isEmpty()) { - return; - } - boolean done = false; - int repeat = 0; - while (!done) { - // This is only called for replication that handles MM tables; no need for mmCtx. - try (BufferedWriter writer = writer()) { - for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); - } - done = true; - } catch (IOException e) { - if (e instanceof FileNotFoundException) { - logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); - throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); - } - repeat++; - logger.info("writeFilesList failed", e); - if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) { - logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); - throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); - } - - int sleepTime = FileUtils.getSleepTime(repeat - 1); - logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException timerEx) { - logger.info("thread sleep interrupted", timerEx.getMessage()); - } - - // in case of io error, reset the file system object - FileSystem.closeAllForUGI(Utils.getUGI()); - dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); - exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); - Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME); - if (exportFileSystem.exists(exportPath)) { - exportFileSystem.delete(exportPath, true); - } - } - } - } - - private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs) - throws IOException { - for (FileStatus fileStatus : fileStatuses) { - if (fileStatus.isDirectory()) { - // Write files inside the sub-directory. - Path subDir = fileStatus.getPath(); - writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir)); - } else { - writer.write(encodedUri(fileStatus, encodedSubDirs)); - writer.newLine(); - } + public Path getPathWithSchemeAndAuthority(Path targetFilePath, Path currentFilePath) { + if (targetFilePath.toUri().getScheme() == null) { + URI currentURI = currentFilePath.toUri(); + targetFilePath = new Path(currentURI.getScheme(), currentURI.getAuthority(), targetFilePath.toUri().getPath()); } + return targetFilePath; } private FileStatus[] listFilesInDir(Path path) throws IOException { @@ -212,30 +145,20 @@ private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, St }); } - private BufferedWriter writer() throws IOException { - Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME); - if (exportFileSystem.exists(exportToFile)) { - throw new IllegalArgumentException( - exportToFile.toString() + " already exists and cant export data from path(dir) " - + dataPathList); + /** + * Since the bootstrap will do table directory level copy, need to check for existence of src path. + */ + private void validateSrcPathListExists() throws IOException, LoginException { + if (dataPathList.isEmpty()) { + return; } - logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile); - return new BufferedWriter( - new OutputStreamWriter(exportFileSystem.create(exportToFile)) - ); - } - - private String encodedSubDir(String encodedParentDirs, Path subDir) { - if (null == encodedParentDirs) { - return subDir.getName(); - } else { - return encodedParentDirs + Path.SEPARATOR + subDir.getName(); + try { + for (Path dataPath : dataPathList) { + listFilesInDir(dataPath); + } + } catch (FileNotFoundException e) { + logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); } } - - private String encodedUri(FileStatus fileStatus, String encodedSubDir) throws IOException { - Path currentDataFilePath = fileStatus.getPath(); - String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem); - return ReplChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir); - } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index aacd29591d..67255f2297 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -127,8 +127,8 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw private int tableDumpCount = 0; @Override - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, - long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path replDataDir, + long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { tableDumpCount++; if (tableDumpCount > 1) {