diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a120b4573d..9b9dac28f5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -475,6 +475,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMPDIR_TTL("hive.repl.dumpdir.ttl", "7d", new TimeValidator(TimeUnit.DAYS), "TTL of dump dirs before cleanup."), + REPL_DUMP_COPY_DATA("hive.repl.dump.copydata", false, + "Indicates whether replication dump should contain actual data."), REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, "Indicates whether replication dump only metadata information or data + metadata. \n" + "This config makes hive.repl.include.external.tables config ineffective."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 622433bb10..d1b0759626 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -304,7 +304,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); - dumpEvent(ev, evRoot, cmRoot, hiveDb); + dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); } replLogger.endLog(lastReplId.toString()); @@ -343,7 +343,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - dumpTable(dbName, tableName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, + dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); } if (tableList != null && isTableSatifiesConfig(table)) { @@ -384,9 +384,10 @@ private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncremen return new Path(dumpRoot, dbName); } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) throws Exception { + private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, + dumpRoot, cmRoot, db, conf, @@ -507,7 +508,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) LOG.debug("Adding table {} to external tables list", tblName); writer.dataLocationDump(tableTuple.object); } - dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, + dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. @@ -565,7 +566,7 @@ Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) return dbRoot; } - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); @@ -583,8 +584,9 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId)); } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); + Path replDataDir = new Path(dumproot, ReplUtils.DATA_COPY_ROOT_DIR_NAME); new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); + replDataDir, exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java index f2c8e8fd54..76c3a8a272 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index fc7f226d77..b7a5808852 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -79,6 +79,9 @@ // Root directory for dumping bootstrapped tables along with incremental events dump. public static final String INC_BOOTSTRAP_ROOT_DIR_NAME = "_bootstrap"; + // Root directory for dumping data. + public static final String DATA_COPY_ROOT_DIR_NAME = ".data"; + // Name of the directory which stores the list of tables included in the policy in case of table level replication. // One file per database, named after the db name. The directory is not created for db level replication. public static final String REPL_TABLE_LIST_DIR_NAME = "_tables"; @@ -236,7 +239,8 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { return p -> { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME) - && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); + && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) + && !p.getName().equalsIgnoreCase(ReplUtils.DATA_COPY_ROOT_DIR_NAME); } catch (IOException e) { throw new RuntimeException(e); } @@ -246,7 +250,8 @@ public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) { return p -> { try { - return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); + return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) + && !p.getName().equalsIgnoreCase(ReplUtils.DATA_COPY_ROOT_DIR_NAME); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 9e24799382..1d70f909fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -50,6 +50,7 @@ // TODO: this object is created once to call one method and then immediately destroyed. // So it's basically just a roundabout way to pass arguments to a static method. Simplify? class PartitionExport { + private final Path replDataDir; private final Paths paths; private final PartitionIterable partitionIterable; private final String distCpDoAsUser; @@ -61,8 +62,9 @@ private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue queue; - PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser, + PartitionExport(Path replDataDir, Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser, HiveConf hiveConf, MmContext mmCtx) { + this.replDataDir = replDataDir; this.paths = paths; this.partitionIterable = partitionIterable; this.distCpDoAsUser = distCpDoAsUser; @@ -115,7 +117,7 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException List dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionExportDir(partitionName); - new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) + new FileOperations(replDataDir, dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) .export(forReplicationSpec); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 01b7fdc4b6..04387b5ca6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -57,6 +57,7 @@ private static final Logger logger = LoggerFactory.getLogger(TableExport.class); private TableSpec tableSpec; + private Path replDataDir; private final ReplicationSpec replicationSpec; private final Hive db; private final String distCpDoAsUser; @@ -65,6 +66,11 @@ private final MmContext mmCtx; public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, + String distCpDoAsUser, HiveConf conf, MmContext mmCtx) { + this(null, paths, tableSpec, replicationSpec, db, distCpDoAsUser, conf, mmCtx); + } + + public TableExport(Path replDataDir, Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, String distCpDoAsUser, HiveConf conf, MmContext mmCtx) { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() @@ -78,6 +84,7 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.tableSpec.tableHandle.setStatsStateLikeNewTable(); } + this.replDataDir = replDataDir; this.db = db; this.distCpDoAsUser = distCpDoAsUser; this.conf = conf; @@ -154,13 +161,13 @@ private void writeData(PartitionIterable partitions) throws SemanticException { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.getTableName().getTable()); } - new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); + new PartitionExport(replDataDir, paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); // this is the data copy - new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) + new FileOperations(replDataDir, dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) .export(replicationSpec); } } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index b9967031cd..5fa7e7864c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -17,15 +17,26 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.login.LoginException; +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + abstract class AbstractEventHandler implements EventHandler { static final Logger LOG = LoggerFactory.getLogger(AbstractEventHandler.class); static final MessageEncoder jsonMessageEncoder = JSONMessageEncoder.getInstance(); @@ -71,4 +82,27 @@ public long fromEventId() { public long toEventId() { return event.getEventId(); } + + public void writeFileEntry(String file, BufferedWriter fileListWriter, Context withinContext) + throws IOException, LoginException { + HiveConf hiveConf = withinContext.hiveConf; + boolean copyActualData = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_COPY_DATA); + String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + if (copyActualData) { + Path dataPath = new Path(withinContext.dumpRoot.toString(), ReplUtils.DATA_COPY_ROOT_DIR_NAME); + List dataPathList = new LinkedList<>(); + String[] decodedURISplits = ReplChangeManager.decodeFileUri(file); + Path srcDataFile = new Path(decodedURISplits[0]); + dataPathList.add(srcDataFile); + FileOperations fileOperations = new FileOperations(dataPath, dataPathList, dataPath, distCpDoAsUser, hiveConf, null); + Path targetPath = fileOperations.getTargetDataPath(srcDataFile, dataPath); + fileOperations.copyOneDataPath(srcDataFile, targetPath); + targetPath = fileOperations.getPathWithSchemeAndAuthority(targetPath, srcDataFile); + String encodedTargetPath = ReplChangeManager.encodeFileUri( + targetPath.toString(), decodedURISplits[1], decodedURISplits[3]); + fileListWriter.write(encodedTargetPath + "\n"); + } else { + fileListWriter.write(file.toString() + "\n"); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 42e74b37d9..34abee1af1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -110,8 +110,7 @@ public void handle(Context withinContext) throws Exception { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { for (String file : files) { - fileListWriter.write(file); - fileListWriter.newLine(); + super.writeFileEntry(file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 7d7dc26a25..1aa64336fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.fs.FileSystem; + +import javax.security.auth.login.LoginException; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; @@ -60,17 +62,19 @@ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOExc return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); } - private void writeDumpFiles(Context withinContext, Iterable files, Path dataPath) throws IOException { + private void writeDumpFiles(Context withinContext, Iterable files, Path dataPath) + throws IOException, LoginException { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + super.writeFileEntry(file, fileListWriter, withinContext); } } } private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, - List qlPtns, List> fileListArray) throws IOException, SemanticException { + List qlPtns, List> fileListArray) + throws IOException, SemanticException, LoginException { if (fileListArray == null || fileListArray.isEmpty()) { return; } @@ -96,7 +100,8 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met } private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, - List qlPtns, List> fileListArray) throws IOException, SemanticException { + List qlPtns, List> fileListArray) + throws IOException, SemanticException, LoginException { Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, qlMdTable.getDbName(), qlMdTable.getTableName()); Context context = new Context(withinContext); context.setEventRoot(newPath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 837d51c8c8..61bb39d9b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -85,7 +85,7 @@ public void handle(Context withinContext) throws Exception { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + super.writeFileEntry(file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index 7d00f89a5b..74a16b41aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -36,6 +36,7 @@ DumpType dumpType(); class Context { + Path dumpRoot; Path eventRoot; final Path cmRoot; final Hive db; @@ -45,8 +46,9 @@ final ReplScope oldReplScope; private Set tablesForBootstrap; - public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, + public Context(Path eventRoot, Path dumpRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope, Set tablesForBootstrap) { + this.dumpRoot = dumpRoot; this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; @@ -58,6 +60,7 @@ public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, Replicat } public Context(Context other) { + this.dumpRoot = other.dumpRoot; this.eventRoot = other.eventRoot; this.cmRoot = other.cmRoot; this.db = other.db; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 5a18d573cf..44fbcf5898 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -97,7 +97,7 @@ public void handle(Context withinContext) throws Exception { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + super.writeFileEntry(file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index fc5419ce3f..dc867f62fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStreamWriter; +import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -55,13 +56,21 @@ private static Logger logger = LoggerFactory.getLogger(FileOperations.class); private final List dataPathList; private final Path exportRootDataDir; + private final Path replDataDir; private final String distCpDoAsUser; private HiveConf hiveConf; private FileSystem exportFileSystem, dataFileSystem; private final MmContext mmCtx; public FileOperations(List dataPathList, Path exportRootDataDir, String distCpDoAsUser, + HiveConf hiveConf, MmContext mmCtx) throws IOException { + this(null, dataPathList, exportRootDataDir, distCpDoAsUser, + hiveConf, mmCtx); + } + + public FileOperations(Path replDataDir, List dataPathList, Path exportRootDataDir, String distCpDoAsUser, HiveConf hiveConf, MmContext mmCtx) throws IOException { + this.replDataDir = replDataDir; this.dataPathList = dataPathList; this.exportRootDataDir = exportRootDataDir; this.distCpDoAsUser = distCpDoAsUser; @@ -76,13 +85,20 @@ public FileOperations(List dataPathList, Path exportRootDataDir, String di } public void export(ReplicationSpec forReplicationSpec) throws Exception { + boolean copyDataFiles = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_COPY_DATA); if (forReplicationSpec.isLazy()) { - exportFilesAsList(); + exportFilesAsList(replDataDir, copyDataFiles); } else { copyFiles(); } } + public Path getTargetDataPath(Path srcDataFilePath, Path replDataDir) { + String srcFileRelativePath = srcDataFilePath.toUri().getPath(); + Path targetDataPath = new Path(replDataDir, srcFileRelativePath.substring(1)); + return targetDataPath; + } + /** * This writes the actual data in the exportRootDataDir from the source. */ @@ -96,7 +112,7 @@ private void copyFiles() throws IOException, LoginException { } } - private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { + public void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath); List srcPaths = new ArrayList<>(); for (FileStatus fileStatus : fileStatuses) { @@ -105,6 +121,11 @@ private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, Log new CopyUtils(distCpDoAsUser, hiveConf, toPath.getFileSystem(hiveConf)).doCopy(toPath, srcPaths); } + private void copyOneDataFilePath(Path fromPath, Path toPath) throws IOException, LoginException { + List srcPaths = new ArrayList<>(); + srcPaths.add(fromPath); + new CopyUtils(distCpDoAsUser, hiveConf, toPath.getFileSystem(hiveConf)).doCopy(toPath, srcPaths); + } private void copyMmPath() throws LoginException, IOException { ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName()); @@ -146,7 +167,7 @@ private void copyMmPath() throws LoginException, IOException { * The data export here is a list of files either in table/partition that are written to the _files * in the exportRootDataDir provided. */ - private void exportFilesAsList() throws SemanticException, IOException, LoginException { + private void exportFilesAsList(Path dataRootDir, boolean copyDataFiles) throws SemanticException, IOException, LoginException { if (dataPathList.isEmpty()) { return; } @@ -156,18 +177,19 @@ private void exportFilesAsList() throws SemanticException, IOException, LoginExc // This is only called for replication that handles MM tables; no need for mmCtx. try (BufferedWriter writer = writer()) { for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + writeFilesList(dataRootDir, copyDataFiles, listFilesInDir(dataPath), writer, + AcidUtils.getAcidSubDir(dataPath)); } done = true; } catch (IOException e) { if (e instanceof FileNotFoundException) { - logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + logger.error("exporting data files in dir : " + dataPathList + " to " + dataRootDir + " failed"); throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); } repeat++; logger.info("writeFilesList failed", e); if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) { - logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + logger.error("exporting data files in dir : " + dataPathList + " to " + dataRootDir + " failed"); throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); } @@ -191,25 +213,44 @@ private void exportFilesAsList() throws SemanticException, IOException, LoginExc } } - private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs) - throws IOException { + private void writeFilesList(Path dumpDataDir, boolean copyData, FileStatus[] fileStatuses, + BufferedWriter writer, String encodedSubDirs) throws IOException, LoginException { for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isDirectory()) { // Write files inside the sub-directory. Path subDir = fileStatus.getPath(); - writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir)); + writeFilesList(dumpDataDir, copyData, listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir)); } else { - writer.write(encodedUri(fileStatus, encodedSubDirs)); - writer.newLine(); + Path currentFilePath = fileStatus.getPath(); + Path listedFilePath = currentFilePath; + if (copyData && dumpDataDir != null) { + Path targetFilePath = getTargetDataPath(currentFilePath, dumpDataDir); + copyOneDataFilePath(currentFilePath, targetFilePath); + listedFilePath = getPathWithSchemeAndAuthority(targetFilePath, currentFilePath); + } + if (isValidFileEntry(fileStatus.getPath())) { + writer.write(encodedUri(listedFilePath, encodedSubDirs)); + writer.newLine(); + } } } } + public Path getPathWithSchemeAndAuthority(Path targetFilePath, Path currentFilePath) { + if (targetFilePath.toUri().getScheme() == null) { + URI currentURI = currentFilePath.toUri(); + targetFilePath = new Path(currentURI.getScheme(), currentURI.getAuthority(), targetFilePath.toUri().getPath()); + } + return targetFilePath; + } + private FileStatus[] listFilesInDir(Path path) throws IOException { - return dataFileSystem.listStatus(path, p -> { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - }); + return dataFileSystem.listStatus(path); + } + + private boolean isValidFileEntry(Path filePath) { + String name = filePath.getName(); + return !name.startsWith("_") && !name.startsWith("."); } private BufferedWriter writer() throws IOException { @@ -233,8 +274,7 @@ private String encodedSubDir(String encodedParentDirs, Path subDir) { } } - private String encodedUri(FileStatus fileStatus, String encodedSubDir) throws IOException { - Path currentDataFilePath = fileStatus.getPath(); + private String encodedUri(Path currentDataFilePath, String encodedSubDir) throws IOException { String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem); return ReplChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index aacd29591d..67255f2297 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -127,8 +127,8 @@ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throw private int tableDumpCount = 0; @Override - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, - long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path replDataDir, + long lastReplId, Hive hiveDb, HiveWrapper.Tuple
tuple) throws Exception { tableDumpCount++; if (tableDumpCount > 1) {