diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index ce683c8a8d..e99d26f4c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -32,11 +32,14 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.util.StringUtils; +import com.google.common.collect.Lists; + /** * CopyTask implementation. **/ @@ -62,14 +65,22 @@ public int execute(DriverContext driverContext) { protected int copyOnePath(Path fromPath, Path toPath) { FileSystem dstFs = null; try { - Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} " + fromPath); + Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} ", fromPath, toPath); console.printInfo("Copying data from " + fromPath.toString(), " to " + toPath.toString()); FileSystem srcFs = fromPath.getFileSystem(conf); dstFs = toPath.getFileSystem(conf); - - FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs()); + + /* TODO: this used to be here, but it is very fragile given that Hive supports nested + directories in the tables. If needed, the caller should pass a flag explicitly + telling us if the directories in the input are data, or parent of data. + if (srcs != null && srcs.length == 1 && srcs[0].isDirectory()) { + srcs = srcFs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + } + */ + + FileStatus[] srcs = srcFs.globStatus(fromPath, new EximPathFilter()); if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { console.printError("No files matching path: " + fromPath.toString()); @@ -106,40 +117,6 @@ protected int copyOnePath(Path fromPath, Path toPath) { } } - // Note: initially copied from LoadSemanticAnalyzer. - private static FileStatus[] matchFilesOrDir( - FileSystem fs, Path path, boolean isSourceMm) throws IOException { - if (!fs.exists(path)) return null; - if (!isSourceMm) return matchFilesOneDir(fs, path, null); - // Note: this doesn't handle list bucketing properly; neither does the original code. - FileStatus[] mmDirs = fs.listStatus(path, new JavaUtils.AnyIdDirFilter()); - if (mmDirs == null || mmDirs.length == 0) return null; - List allFiles = new ArrayList(); - for (FileStatus mmDir : mmDirs) { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Found source MM directory " + mmDir.getPath()); - } - matchFilesOneDir(fs, mmDir.getPath(), allFiles); - } - return allFiles.toArray(new FileStatus[allFiles.size()]); - } - - private static FileStatus[] matchFilesOneDir( - FileSystem fs, Path path, List result) throws IOException { - FileStatus[] srcs = fs.globStatus(path, new EximPathFilter()); - if (srcs != null && srcs.length == 1) { - if (srcs[0].isDirectory()) { - srcs = fs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - } - } - if (result != null && srcs != null) { - for (int i = 0; i < srcs.length; ++i) { - result.add(srcs[i]); - } - } - return srcs; - } - private static final class EximPathFilter implements PathFilter { @Override public boolean accept(Path p) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index aba65918f8..119d7923a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -47,15 +47,13 @@ public String getName() { protected int execute(DriverContext driverContext) { try { // Also creates the root directory - TableExport.Paths exportPaths = - new TableExport.Paths(work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), - conf, false); + TableExport.Paths exportPaths = new TableExport.Paths( + work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false); Hive db = getHive(); LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); work.acidPostProcess(db); - TableExport tableExport = new TableExport( - exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf - ); + TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), + work.getReplicationSpec(), db, null, conf, work.getMmContext()); if (!tableExport.write()) { throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 5fbe045df5..da1382ded6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4391,32 +4391,29 @@ private static void deleteUncommitedFile(Path childPath, FileSystem fs) } /** - * @return the complete list of valid MM directories under a table/partition path; null - * if the entire directory is valid (has no uncommitted/temporary files). + * @return the complete list of valid MM directories under a table/partition path. */ public static List getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf, ValidWriteIdList validWriteIdList, int lbLevels) throws IOException { Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", path); // NULL means this directory is entirely valid. - List result = null; FileSystem fs = path.getFileSystem(conf); FileStatus[] children = (lbLevels == 0) ? fs.listStatus(path) : fs.globStatus(new Path(path, StringUtils.repeat("*" + Path.SEPARATOR, lbLevels) + "*")); + List result = new ArrayList<>(children.length); for (int i = 0; i < children.length; ++i) { FileStatus file = children[i]; Path childPath = file.getPath(); + if (!file.isDirectory()) { + Utilities.FILE_OP_LOGGER.debug("Skipping non-directory {}", file.getPath()); + continue; + } Long writeId = JavaUtils.extractWriteId(childPath); - if (!file.isDirectory() || writeId == null || !validWriteIdList.isWriteIdValid(writeId)) { + if (writeId == null || !validWriteIdList.isWriteIdValid(writeId)) { Utilities.FILE_OP_LOGGER.debug("Skipping path {}", childPath); - if (result == null) { - result = new ArrayList<>(children.length - 1); - for (int j = 0; j < i; ++j) { - result.add(children[j].getPath()); - } - } - } else if (result != null) { - result.add(childPath); + continue; } + result.add(childPath); } return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index ce0757cba2..910fad6aa4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -276,7 +277,9 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false - new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write(); + MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); + new TableExport( + exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf, mmCtx).write(); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } catch (InvalidTableException te) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index d3c62a2775..399ca03d29 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -20,11 +20,15 @@ import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -32,8 +36,10 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.plan.ExportWork; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import javax.annotation.Nullable; + import java.util.Set; /** @@ -41,6 +47,7 @@ * */ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { + private boolean isMmExport = false; ExportSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -48,7 +55,9 @@ @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs)); + Task task = analyzeExport(ast, null, db, conf, inputs, outputs); + isMmExport = task.getWork().getMmContext() != null; + rootTasks.add(task); } /** * @param acidTableName - table name in db.table format; not NULL if exporting Acid table @@ -80,12 +89,10 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { try { ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true); - } catch (SemanticException sme){ - if ((replicationSpec.isInReplicationScope()) && - ((sme.getCause() instanceof InvalidTableException) - || (sme instanceof Table.ValidationFailureSemanticException) - ) - ){ + } catch (SemanticException sme) { + if (!replicationSpec.isInReplicationScope()) throw sme; + if ((sme.getCause() instanceof InvalidTableException) + || (sme instanceof Table.ValidationFailureSemanticException)) { // If we're in replication scope, it's possible that we're running the export long after // the table was dropped, so the table not existing currently or being a different kind of // table is not an error - it simply means we should no-op, and let a future export @@ -101,15 +108,26 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // All parsing is done, we're now good to start the export process TableExport.Paths exportPaths = new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false); - TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf); - TableExport.AuthEntities authEntities = tableExport.getAuthEntities(); + // Note: this tableExport is actually never used other than for auth, and another one is + // created when the task is executed. So, we don't care about the correct MM state here. + TableExport.AuthEntities authEntities = new TableExport( + exportPaths, ts, replicationSpec, db, null, conf, null).getAuthEntities(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); String exportRootDirName = tmpPath; + MmContext mmCtx = MmContext.createIfNeeded(ts == null ? null : ts.tableHandle); + + Utilities.FILE_OP_LOGGER.debug("Exporting table {} / {}: MM context {}", + ts.tableName, ts.tableHandle.getTableName(), mmCtx); // Configure export work - ExportWork exportWork = - new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName); + ExportWork exportWork = new ExportWork(exportRootDirName, ts, replicationSpec, + ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName, mmCtx); // Create an export task and add it as a root task - return TaskFactory.get(exportWork); + return TaskFactory.get(exportWork); + } + + @Override + public boolean hasTransactionalInQuery() { + return isMmExport; // Full ACID export goes thru UpdateDelete analyzer. } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index ac44be5e0b..31c56d7c50 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -382,7 +382,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long writeId, int stmtId, boolean isSourceMm) { + Long writeId, int stmtId) { assert table != null; assert table.getParameters() != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); @@ -423,9 +423,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if (replicationSpec.isInReplicationScope()) { copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(dataPath, destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } LoadTableDesc loadTableWork = new LoadTableDesc( @@ -480,8 +478,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, - EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm, - Task commitTask) + EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, Task commitTask) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); if (tblDesc.isExternal() && tblDesc.getLocation() == null) { @@ -518,9 +515,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, copyTask = ReplCopyTask.getLoadCopyTask( replicationSpec, new Path(srcLocation), destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } Task addPartTask = TaskFactory.get( @@ -834,8 +829,6 @@ private static void createRegularImportTasks( EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws HiveException, IOException, MetaException { - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); - if (table != null) { if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); @@ -848,7 +841,7 @@ private static void createRegularImportTasks( org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, ict)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); @@ -860,8 +853,7 @@ private static void createRegularImportTasks( Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); - loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, - isSourceMm); + loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId); } // Set this to read because we can't overwrite any existing partitions x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -883,7 +875,7 @@ private static void createRegularImportTasks( AcidUtils.isInsertOnlyTable(tblDesc.getTblProps())); for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, - replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + replicationSpec, x, writeId, stmtId, ict)); } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); @@ -901,7 +893,7 @@ private static void createRegularImportTasks( FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG()); t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, - writeId, stmtId, isSourceMm)); + writeId, stmtId)); } } x.getTasks().add(t); @@ -941,7 +933,6 @@ private static void createReplImportTasks( throws HiveException, URISyntaxException, IOException, MetaException { Task dropTblTask = null; - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; // Normally, on import, trying to create a table or a partition in a db that does not yet exist @@ -1028,14 +1019,14 @@ private static void createReplImportTasks( for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, ict)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId)); } } @@ -1061,7 +1052,7 @@ private static void createReplImportTasks( if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, ict)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } @@ -1078,7 +1069,7 @@ private static void createReplImportTasks( if (replicationSpec.allowReplacementInto(ptn.getParameters())){ if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, ict)); } else { x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); @@ -1104,7 +1095,7 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), - replicationSpec, x, writeId, stmtId, isSourceMm); + replicationSpec, x, writeId, stmtId); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 820046388a..70295da960 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -215,7 +215,7 @@ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode t case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: - if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { + if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { return new UpdateDeleteSemanticAnalyzer(queryState); } return new ExportSemanticAnalyzer(queryState); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 5844f3d97f..a06932611a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -25,6 +26,7 @@ import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +45,8 @@ * it has a blocking queue that stores partitions to be dumped via a producer thread. * it has a worker thread pool that reads of the queue to perform the various tasks. */ +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? class PartitionExport { private final Paths paths; private final PartitionIterable partitionIterable; @@ -52,13 +56,15 @@ private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue queue; + private final MmContext mmCtx; PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser, - HiveConf hiveConf) { + HiveConf hiveConf, MmContext mmCtx) { this.paths = paths; this.partitionIterable = partitionIterable; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); this.queue = new ArrayBlockingQueue<>(2 * nThreads); } @@ -101,7 +107,7 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException try { // this the data copy Path rootDataDumpDir = paths.partitionExportDir(partitionName); - new FileOperations(fromPath, rootDataDumpDir, distCpDoAsUser, hiveConf) + new FileOperations(fromPath, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) .export(forReplicationSpec); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index abb2e8874b..6b563c44c3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,8 @@ import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity; +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class TableExport { private static final Logger logger = LoggerFactory.getLogger(TableExport.class); @@ -59,9 +62,10 @@ private final String distCpDoAsUser; private final HiveConf conf; private final Paths paths; + private final MmContext mmCtx; public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, - String distCpDoAsUser, HiveConf conf) { + String distCpDoAsUser, HiveConf conf, MmContext mmCtx) { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) @@ -76,6 +80,7 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.distCpDoAsUser = distCpDoAsUser; this.conf = conf; this.paths = paths; + this.mmCtx = mmCtx; } public boolean write() throws SemanticException { @@ -147,11 +152,11 @@ private void writeData(PartitionIterable partitions) throws SemanticException { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.tableName); } - new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec); + new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); } else { Path fromPath = tableSpec.tableHandle.getDataLocation(); // this is the data copy - new FileOperations(fromPath, paths.dataExportDir(), distCpDoAsUser, conf) + new FileOperations(fromPath, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) .export(replicationSpec); } } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 866d3513b1..0a09adb35d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -20,37 +20,49 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; +import org.apache.hadoop.hive.ql.plan.ExportWork; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; + import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class FileOperations { - private static Logger logger = LoggerFactory.getLogger(FileOperations.class); + private static Logger LOG = LoggerFactory.getLogger(FileOperations.class); private final Path dataFileListPath; private final Path exportRootDataDir; private final String distCpDoAsUser; private HiveConf hiveConf; private final FileSystem dataFileSystem, exportFileSystem; + private final MmContext mmCtx; - public FileOperations(Path dataFileListPath, Path exportRootDataDir, - String distCpDoAsUser, HiveConf hiveConf) throws IOException { + public FileOperations(Path dataFileListPath, Path exportRootDataDir, String distCpDoAsUser, + HiveConf hiveConf, ExportWork.MmContext mmCtx) throws IOException { this.dataFileListPath = dataFileListPath; this.exportRootDataDir = exportRootDataDir; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; dataFileSystem = dataFileListPath.getFileSystem(hiveConf); exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); } @@ -67,13 +79,40 @@ public void export(ReplicationSpec forReplicationSpec) throws Exception { * This writes the actual data in the exportRootDataDir from the source. */ private void copyFiles() throws IOException, LoginException { - FileStatus[] fileStatuses = - LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath); + if (mmCtx == null) { + copyOneDataPath(dataFileListPath, exportRootDataDir); + } else { + copyMmPath(); + } + } + + private void copyMmPath() throws LoginException, IOException { + ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName()); + Path fromPath = dataFileSystem.makeQualified(dataFileListPath); + List validPaths = Utilities.getValidMmDirectoriesFromTableOrPart( + fromPath, hiveConf, ids, mmCtx.getLbLevels()); + String fromPathStr = fromPath.toString(); + if (!fromPathStr.endsWith(Path.SEPARATOR)) { + fromPathStr += Path.SEPARATOR; + } + for (Path validPath : validPaths) { + // Export valid directories with a modified name so they don't look like bases/deltas. + // We could also dump the delta contents all together and rename the files if names collide. + String mmChildPath = "not_" + validPath.toString().substring(fromPathStr.length()); + Path destPath = new Path(exportRootDataDir, mmChildPath); + exportFileSystem.mkdirs(destPath); + copyOneDataPath(validPath, destPath); + } + } + + private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { + FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath); List srcPaths = new ArrayList<>(); for (FileStatus fileStatus : fileStatuses) { srcPaths.add(fileStatus.getPath()); } - new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths); + + new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths); } /** @@ -82,6 +121,10 @@ private void copyFiles() throws IOException, LoginException { * in the exportRootDataDir provided. */ private void exportFilesAsList() throws SemanticException, IOException { + if (mmCtx != null) { + // TODO: fix when the replication stuff is done for MM tables. + throw new SemanticException("This replication path is not supported for MM tables"); + } try (BufferedWriter writer = writer()) { FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath); @@ -99,7 +142,7 @@ private BufferedWriter writer() throws IOException { exportToFile.toString() + " already exists and cant export data from path(dir) " + dataFileListPath); } - logger.debug("exporting data files in dir : " + dataFileListPath + " to " + exportToFile); + LOG.debug("exporting data files in dir : " + dataFileListPath + " to " + exportToFile); return new BufferedWriter( new OutputStreamWriter(exportFileSystem.create(exportToFile)) ); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index c0e4a43d9c..d0bad29bbd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -33,7 +33,6 @@ private Path[] fromPath; private Path[] toPath; private boolean errorOnSrcEmpty; - private boolean isSkipMmDirs = false; public CopyWork() { } @@ -92,17 +91,4 @@ public void setErrorOnSrcEmpty(boolean errorOnSrcEmpty) { public boolean isErrorOnSrcEmpty() { return errorOnSrcEmpty; } - - /** - * Whether the copy should ignore MM directories in the source, and copy their content to - * destination directly, rather than copying the directories themselves. - * */ - public void setSkipSourceMmDirs(boolean isMm) { - this.isSkipMmDirs = isMm; - } - - public boolean doSkipSourceMmDirs() { - return isSkipMmDirs ; - } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index 72ce79836c..6bdd67a05a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -17,11 +17,17 @@ */ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,26 +36,58 @@ @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ExportWork implements Serializable { - private Logger LOG = LoggerFactory.getLogger(ExportWork.class); + private static Logger LOG = LoggerFactory.getLogger(ExportWork.class); private static final long serialVersionUID = 1L; + public final static class MmContext { + private final int lbLevels; + private final String fqTableName; + + private MmContext(int lbLevels, String fqTableName) { + this.lbLevels = lbLevels; + this.fqTableName = fqTableName; + } + + @Override + public String toString() { + return "[" + fqTableName + "; lbLevels=" + lbLevels + "]"; + } + + public static MmContext createIfNeeded(Table t) { + if (t == null) return null; + if (!AcidUtils.isInsertOnlyTable(t.getParameters())) return null; + int lbLevels = t.isStoredAsSubDirectories() ? t.getSkewedColNames().size() : 0; + return new MmContext(lbLevels, AcidUtils.getFullTableName(t.getDbName(), t.getTableName())); + } + + public String getFqTableName() { + return fqTableName; + } + + public int getLbLevels() { + return lbLevels; + } + } + private final String exportRootDirName; private TableSpec tableSpec; private ReplicationSpec replicationSpec; private String astRepresentationForErrorMsg; - private String qualifiedTableName; + private String acidFqTableName; + private final MmContext mmContext; /** - * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise + * @param acidFqTableName if exporting Acid table, this is temp table - null otherwise */ public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, - String astRepresentationForErrorMsg, String qualifiedTable) { + String astRepresentationForErrorMsg, String acidFqTableName, MmContext mmContext) { this.exportRootDirName = exportRootDirName; this.tableSpec = tableSpec; this.replicationSpec = replicationSpec; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; - this.qualifiedTableName = qualifiedTable; + this.mmContext = mmContext; + this.acidFqTableName = acidFqTableName; } public String getExportRootDir() { @@ -60,24 +98,16 @@ public TableSpec getTableSpec() { return tableSpec; } - public void setTableSpec(TableSpec tableSpec) { - this.tableSpec = tableSpec; - } - public ReplicationSpec getReplicationSpec() { return replicationSpec; } - public void setReplicationSpec(ReplicationSpec replicationSpec) { - this.replicationSpec = replicationSpec; - } - public String getAstRepresentationForErrorMsg() { return astRepresentationForErrorMsg; } - public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) { - this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + public MmContext getMmContext() { + return mmContext; } /** @@ -88,10 +118,10 @@ public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) * for more info. */ public void acidPostProcess(Hive db) throws HiveException { - if(qualifiedTableName != null) { - LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName + + if (acidFqTableName != null) { + LOG.info("Swapping export of " + tableSpec.tableName + " to " + acidFqTableName + " using partSpec=" + tableSpec.partSpec); - tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true); + tableSpec = new TableSpec(db, acidFqTableName, tableSpec.partSpec, true); } } } diff --git ql/src/test/queries/clientpositive/mm_exim.q ql/src/test/queries/clientpositive/mm_exim.q index c47342bd23..a2b6e08603 100644 --- ql/src/test/queries/clientpositive/mm_exim.q +++ ql/src/test/queries/clientpositive/mm_exim.q @@ -59,13 +59,13 @@ drop table import1_mm; drop table import2_mm; import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart'; -desc import2_mm; +desc formatted import2_mm; select * from import2_mm order by key, p; drop table import2_mm; drop table import3_mm; import table import3_mm from 'ql/test/data/exports/intermmediate_part'; -desc import3_mm; +desc formatted import3_mm; select * from import3_mm order by key, p; drop table import3_mm; diff --git ql/src/test/results/clientpositive/llap/mm_exim.q.out ql/src/test/results/clientpositive/llap/mm_exim.q.out index 1f40754373..e9798c35f2 100644 --- ql/src/test/results/clientpositive/llap/mm_exim.q.out +++ ql/src/test/results/clientpositive/llap/mm_exim.q.out @@ -292,14 +292,42 @@ POSTHOOK: type: IMPORT #### A masked pattern was here #### POSTHOOK: Output: database:default POSTHOOK: Output: default@import2_mm -PREHOOK: query: desc import2_mm +PREHOOK: query: desc formatted import2_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import2_mm -POSTHOOK: query: desc import2_mm +POSTHOOK: query: desc formatted import2_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import2_mm +# col_name data_type comment key int p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE {} + numFiles 3 + numRows 6 + rawDataSize 37 + totalSize 43 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import2_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import2_mm @@ -338,18 +366,45 @@ POSTHOOK: Output: default@import3_mm POSTHOOK: Output: default@import3_mm@p=455 POSTHOOK: Output: default@import3_mm@p=456 POSTHOOK: Output: default@import3_mm@p=457 -PREHOOK: query: desc import3_mm +PREHOOK: query: desc formatted import3_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import3_mm -POSTHOOK: query: desc import3_mm +POSTHOOK: query: desc formatted import3_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import3_mm +# col_name data_type comment key int -p int # Partition Information # col_name data_type comment p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 3 + numPartitions 3 + numRows 6 + rawDataSize 13 + totalSize 19 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import3_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import3_mm