diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index b0ec5abcce..1a8e5e79e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -63,14 +63,25 @@ public int execute(DriverContext driverContext) { protected int copyOnePath(Path fromPath, Path toPath) { FileSystem dstFs = null; try { - Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} " + fromPath); + Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} ", fromPath, toPath); console.printInfo("Copying data from " + fromPath.toString(), " to " + toPath.toString()); FileSystem srcFs = fromPath.getFileSystem(conf); dstFs = toPath.getFileSystem(conf); - FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs()); + FileStatus[] srcs = srcFs.globStatus(fromPath, new EximPathFilter()); + + // TODO: this is very brittle given that Hive supports nested directories in the tables. + // The caller should pass a flag explicitly telling us if the directories in the + // input are data, or parent of data. For now, retain this for backward compat. + if (srcs != null && srcs.length == 1 && srcs[0].isDirectory() + /*&& srcs[0].getPath().getName().equals(EximUtil.DATA_PATH_NAME) - still broken for partitions*/) { + Utilities.FILE_OP_LOGGER.debug( + "Recursing into a single child directory {}", srcs[0].getPath().getName()); + srcs = srcFs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + } + if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { console.printError("No files matching path: " + fromPath.toString()); @@ -107,40 +118,6 @@ protected int copyOnePath(Path fromPath, Path toPath) { } } - // Note: initially copied from LoadSemanticAnalyzer. - private static FileStatus[] matchFilesOrDir( - FileSystem fs, Path path, boolean isSourceMm) throws IOException { - if (!fs.exists(path)) return null; - if (!isSourceMm) return matchFilesOneDir(fs, path, null); - // Note: this doesn't handle list bucketing properly; neither does the original code. - FileStatus[] mmDirs = fs.listStatus(path, new AcidUtils.AnyIdDirFilter()); - if (mmDirs == null || mmDirs.length == 0) return null; - List allFiles = new ArrayList(); - for (FileStatus mmDir : mmDirs) { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Found source MM directory " + mmDir.getPath()); - } - matchFilesOneDir(fs, mmDir.getPath(), allFiles); - } - return allFiles.toArray(new FileStatus[allFiles.size()]); - } - - private static FileStatus[] matchFilesOneDir( - FileSystem fs, Path path, List result) throws IOException { - FileStatus[] srcs = fs.globStatus(path, new EximPathFilter()); - if (srcs != null && srcs.length == 1) { - if (srcs[0].isDirectory()) { - srcs = fs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - } - } - if (result != null && srcs != null) { - for (int i = 0; i < srcs.length; ++i) { - result.add(srcs[i]); - } - } - return srcs; - } - private static final class EximPathFilter implements PathFilter { @Override public boolean accept(Path p) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index aba65918f8..119d7923a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -47,15 +47,13 @@ public String getName() { protected int execute(DriverContext driverContext) { try { // Also creates the root directory - TableExport.Paths exportPaths = - new TableExport.Paths(work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), - conf, false); + TableExport.Paths exportPaths = new TableExport.Paths( + work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false); Hive db = getHive(); LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); work.acidPostProcess(db); - TableExport tableExport = new TableExport( - exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf - ); + TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), + work.getReplicationSpec(), db, null, conf, work.getMmContext()); if (!tableExport.write()) { throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index b5a7853101..b10241800b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.Expression; @@ -72,6 +73,7 @@ import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.codec.binary.Base64; @@ -136,6 +138,7 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index ce0757cba2..910fad6aa4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -276,7 +277,9 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false - new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write(); + MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); + new TableExport( + exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf, mmCtx).write(); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } catch (InvalidTableException te) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index d3c62a2775..4a366a9360 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -19,12 +19,17 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Set; + +import javax.annotation.Nullable; + import org.antlr.runtime.tree.Tree; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -32,15 +37,14 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.plan.ExportWork; - -import javax.annotation.Nullable; -import java.util.Set; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; /** * ExportSemanticAnalyzer. * */ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { + private boolean isMmExport = false; ExportSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -48,7 +52,9 @@ @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs)); + Task task = analyzeExport(ast, null, db, conf, inputs, outputs); + isMmExport = task.getWork().getMmContext() != null; + rootTasks.add(task); } /** * @param acidTableName - table name in db.table format; not NULL if exporting Acid table @@ -80,12 +86,10 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { try { ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true); - } catch (SemanticException sme){ - if ((replicationSpec.isInReplicationScope()) && - ((sme.getCause() instanceof InvalidTableException) - || (sme instanceof Table.ValidationFailureSemanticException) - ) - ){ + } catch (SemanticException sme) { + if (!replicationSpec.isInReplicationScope()) throw sme; + if ((sme.getCause() instanceof InvalidTableException) + || (sme instanceof Table.ValidationFailureSemanticException)) { // If we're in replication scope, it's possible that we're running the export long after // the table was dropped, so the table not existing currently or being a different kind of // table is not an error - it simply means we should no-op, and let a future export @@ -101,15 +105,26 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // All parsing is done, we're now good to start the export process TableExport.Paths exportPaths = new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false); - TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf); - TableExport.AuthEntities authEntities = tableExport.getAuthEntities(); + // Note: this tableExport is actually never used other than for auth, and another one is + // created when the task is executed. So, we don't care about the correct MM state here. + TableExport.AuthEntities authEntities = new TableExport( + exportPaths, ts, replicationSpec, db, null, conf, null).getAuthEntities(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); String exportRootDirName = tmpPath; + MmContext mmCtx = MmContext.createIfNeeded(ts == null ? null : ts.tableHandle); + + Utilities.FILE_OP_LOGGER.debug("Exporting table {}: MM context {}", + ts == null ? null : ts.tableName, mmCtx); // Configure export work - ExportWork exportWork = - new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName); + ExportWork exportWork = new ExportWork(exportRootDirName, ts, replicationSpec, + ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName, mmCtx); // Create an export task and add it as a root task - return TaskFactory.get(exportWork); + return TaskFactory.get(exportWork); + } + + @Override + public boolean hasTransactionalInQuery() { + return isMmExport; // Full ACID export goes thru UpdateDelete analyzer. } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index b850ddc9d0..22dc130a8d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -381,7 +381,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long writeId, int stmtId, boolean isSourceMm) { + Long writeId, int stmtId) { assert table != null; assert table.getParameters() != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); @@ -422,9 +422,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if (replicationSpec.isInReplicationScope()) { copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(dataPath, destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } LoadTableDesc loadTableWork = new LoadTableDesc( @@ -479,7 +477,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, - EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm) + EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); if (tblDesc.isExternal() && tblDesc.getLocation() == null) { @@ -516,9 +514,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, copyTask = ReplCopyTask.getLoadCopyTask( replicationSpec, new Path(srcLocation), destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } Task addPartTask = TaskFactory.get( @@ -829,8 +825,6 @@ private static void createRegularImportTasks( EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws HiveException, IOException, MetaException { - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); - if (table != null) { if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); @@ -840,7 +834,7 @@ private static void createRegularImportTasks( org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); @@ -852,8 +846,7 @@ private static void createRegularImportTasks( Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); - loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, - isSourceMm); + loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId); } // Set this to read because we can't overwrite any existing partitions x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -872,7 +865,7 @@ private static void createRegularImportTasks( if (isPartitioned(tblDesc)) { for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, - replicationSpec, x, writeId, stmtId, isSourceMm)); + replicationSpec, x, writeId, stmtId)); } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); @@ -890,7 +883,7 @@ private static void createRegularImportTasks( FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG()); t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, - writeId, stmtId, isSourceMm)); + writeId, stmtId)); } } x.getTasks().add(t); @@ -922,7 +915,6 @@ private static void createReplImportTasks( throws HiveException, URISyntaxException, IOException, MetaException { Task dropTblTask = null; - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; // Normally, on import, trying to create a table or a partition in a db that does not yet exist @@ -1006,14 +998,14 @@ private static void createReplImportTasks( for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId)); } } @@ -1036,7 +1028,7 @@ private static void createReplImportTasks( if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } @@ -1053,7 +1045,7 @@ private static void createReplImportTasks( if (replicationSpec.allowReplacementInto(ptn.getParameters())){ if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); } else { x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); @@ -1079,7 +1071,7 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), - replicationSpec, x, writeId, stmtId, isSourceMm); + replicationSpec, x, writeId, stmtId); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 820046388a..70295da960 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -215,7 +215,7 @@ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode t case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: - if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { + if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { return new UpdateDeleteSemanticAnalyzer(queryState); } return new ExportSemanticAnalyzer(queryState); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 5844f3d97f..a06932611a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -25,6 +26,7 @@ import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +45,8 @@ * it has a blocking queue that stores partitions to be dumped via a producer thread. * it has a worker thread pool that reads of the queue to perform the various tasks. */ +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? class PartitionExport { private final Paths paths; private final PartitionIterable partitionIterable; @@ -52,13 +56,15 @@ private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue queue; + private final MmContext mmCtx; PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser, - HiveConf hiveConf) { + HiveConf hiveConf, MmContext mmCtx) { this.paths = paths; this.partitionIterable = partitionIterable; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); this.queue = new ArrayBlockingQueue<>(2 * nThreads); } @@ -101,7 +107,7 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException try { // this the data copy Path rootDataDumpDir = paths.partitionExportDir(partitionName); - new FileOperations(fromPath, rootDataDumpDir, distCpDoAsUser, hiveConf) + new FileOperations(fromPath, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) .export(forReplicationSpec); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index abb2e8874b..6b563c44c3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,8 @@ import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity; +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class TableExport { private static final Logger logger = LoggerFactory.getLogger(TableExport.class); @@ -59,9 +62,10 @@ private final String distCpDoAsUser; private final HiveConf conf; private final Paths paths; + private final MmContext mmCtx; public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, - String distCpDoAsUser, HiveConf conf) { + String distCpDoAsUser, HiveConf conf, MmContext mmCtx) { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) @@ -76,6 +80,7 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.distCpDoAsUser = distCpDoAsUser; this.conf = conf; this.paths = paths; + this.mmCtx = mmCtx; } public boolean write() throws SemanticException { @@ -147,11 +152,11 @@ private void writeData(PartitionIterable partitions) throws SemanticException { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.tableName); } - new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec); + new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); } else { Path fromPath = tableSpec.tableHandle.getDataLocation(); // this is the data copy - new FileOperations(fromPath, paths.dataExportDir(), distCpDoAsUser, conf) + new FileOperations(fromPath, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) .export(replicationSpec); } } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 866d3513b1..7f8a9f0871 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -17,40 +17,51 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.List; + +import javax.security.auth.login.LoginException; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; +import org.apache.hadoop.hive.ql.plan.ExportWork; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.List; - +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class FileOperations { - private static Logger logger = LoggerFactory.getLogger(FileOperations.class); + private static Logger LOG = LoggerFactory.getLogger(FileOperations.class); private final Path dataFileListPath; private final Path exportRootDataDir; private final String distCpDoAsUser; private HiveConf hiveConf; private final FileSystem dataFileSystem, exportFileSystem; + private final MmContext mmCtx; - public FileOperations(Path dataFileListPath, Path exportRootDataDir, - String distCpDoAsUser, HiveConf hiveConf) throws IOException { + public FileOperations(Path dataFileListPath, Path exportRootDataDir, String distCpDoAsUser, + HiveConf hiveConf, ExportWork.MmContext mmCtx) throws IOException { this.dataFileListPath = dataFileListPath; this.exportRootDataDir = exportRootDataDir; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; dataFileSystem = dataFileListPath.getFileSystem(hiveConf); exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); } @@ -67,13 +78,53 @@ public void export(ReplicationSpec forReplicationSpec) throws Exception { * This writes the actual data in the exportRootDataDir from the source. */ private void copyFiles() throws IOException, LoginException { - FileStatus[] fileStatuses = - LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath); + if (mmCtx == null) { + copyOneDataPath(dataFileListPath, exportRootDataDir); + } else { + copyMmPath(); + } + } + + private void copyMmPath() throws LoginException, IOException { + ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName()); + Path fromPath = dataFileSystem.makeQualified(dataFileListPath); + List validPaths = getMmValidPaths(ids, fromPath); + String fromPathStr = fromPath.toString(); + if (!fromPathStr.endsWith(Path.SEPARATOR)) { + fromPathStr += Path.SEPARATOR; + } + for (Path validPath : validPaths) { + // Export valid directories with a modified name so they don't look like bases/deltas. + // We could also dump the delta contents all together and rename the files if names collide. + String mmChildPath = "export_old_" + validPath.toString().substring(fromPathStr.length()); + Path destPath = new Path(exportRootDataDir, mmChildPath); + exportFileSystem.mkdirs(destPath); + copyOneDataPath(validPath, destPath); + } + } + + private List getMmValidPaths(ValidWriteIdList ids, Path fromPath) throws IOException { + Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", fromPath); + AcidUtils.Directory acidState = AcidUtils.getAcidState(fromPath, hiveConf, ids); + List validPaths = new ArrayList<>(); + Path base = acidState.getBaseDirectory(); + if (base != null) { + validPaths.add(base); + } + for (ParsedDelta pd : acidState.getCurrentDirectories()) { + validPaths.add(pd.getPath()); + } + return validPaths; + } + + private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { + FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath); List srcPaths = new ArrayList<>(); for (FileStatus fileStatus : fileStatuses) { srcPaths.add(fileStatus.getPath()); } - new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths); + + new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths); } /** @@ -83,15 +134,27 @@ private void copyFiles() throws IOException, LoginException { */ private void exportFilesAsList() throws SemanticException, IOException { try (BufferedWriter writer = writer()) { - FileStatus[] fileStatuses = - LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath); - for (FileStatus fileStatus : fileStatuses) { - writer.write(encodedUri(fileStatus)); - writer.newLine(); + if (mmCtx != null) { + ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName()); + List validPaths = getMmValidPaths(ids, dataFileListPath); + for (Path mmPath : validPaths) { + writeStatuses(writer, dataFileSystem.listStatus(mmPath, AcidUtils.hiddenFileFilter)); + } + } else { + writeStatuses(writer, + LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath)); } } } + private void writeStatuses(BufferedWriter writer, FileStatus[] fileStatuses) + throws IOException { + for (FileStatus fileStatus : fileStatuses) { + writer.write(encodedUri(fileStatus)); + writer.newLine(); + } + } + private BufferedWriter writer() throws IOException { Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME); if (exportFileSystem.exists(exportToFile)) { @@ -99,7 +162,7 @@ private BufferedWriter writer() throws IOException { exportToFile.toString() + " already exists and cant export data from path(dir) " + dataFileListPath); } - logger.debug("exporting data files in dir : " + dataFileListPath + " to " + exportToFile); + LOG.debug("exporting data files in dir : " + dataFileListPath + " to " + exportToFile); return new BufferedWriter( new OutputStreamWriter(exportFileSystem.create(exportToFile)) ); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index c0e4a43d9c..018983f6dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -33,15 +33,10 @@ private Path[] fromPath; private Path[] toPath; private boolean errorOnSrcEmpty; - private boolean isSkipMmDirs = false; public CopyWork() { } - public CopyWork(final Path fromPath, final Path toPath) { - this(fromPath, toPath, true); - } - public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) { this(new Path[] { fromPath }, new Path[] { toPath }); this.setErrorOnSrcEmpty(errorOnSrcEmpty); @@ -92,17 +87,4 @@ public void setErrorOnSrcEmpty(boolean errorOnSrcEmpty) { public boolean isErrorOnSrcEmpty() { return errorOnSrcEmpty; } - - /** - * Whether the copy should ignore MM directories in the source, and copy their content to - * destination directly, rather than copying the directories themselves. - * */ - public void setSkipSourceMmDirs(boolean isMm) { - this.isSkipMmDirs = isMm; - } - - public boolean doSkipSourceMmDirs() { - return isSkipMmDirs ; - } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index 72ce79836c..d91569ec6d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -17,39 +17,65 @@ */ package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; - @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ExportWork implements Serializable { - private Logger LOG = LoggerFactory.getLogger(ExportWork.class); + private static Logger LOG = LoggerFactory.getLogger(ExportWork.class); private static final long serialVersionUID = 1L; + public final static class MmContext { + private final String fqTableName; + + private MmContext(String fqTableName) { + this.fqTableName = fqTableName; + } + + @Override + public String toString() { + return "[" + fqTableName + "]"; + } + + public static MmContext createIfNeeded(Table t) { + if (t == null) return null; + if (!AcidUtils.isInsertOnlyTable(t.getParameters())) return null; + return new MmContext(AcidUtils.getFullTableName(t.getDbName(), t.getTableName())); + } + + public String getFqTableName() { + return fqTableName; + } + } + private final String exportRootDirName; private TableSpec tableSpec; private ReplicationSpec replicationSpec; private String astRepresentationForErrorMsg; - private String qualifiedTableName; + private String acidFqTableName; + private final MmContext mmContext; /** - * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise + * @param acidFqTableName if exporting Acid table, this is temp table - null otherwise */ public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, - String astRepresentationForErrorMsg, String qualifiedTable) { + String astRepresentationForErrorMsg, String acidFqTableName, MmContext mmContext) { this.exportRootDirName = exportRootDirName; this.tableSpec = tableSpec; this.replicationSpec = replicationSpec; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; - this.qualifiedTableName = qualifiedTable; + this.mmContext = mmContext; + this.acidFqTableName = acidFqTableName; } public String getExportRootDir() { @@ -60,24 +86,16 @@ public TableSpec getTableSpec() { return tableSpec; } - public void setTableSpec(TableSpec tableSpec) { - this.tableSpec = tableSpec; - } - public ReplicationSpec getReplicationSpec() { return replicationSpec; } - public void setReplicationSpec(ReplicationSpec replicationSpec) { - this.replicationSpec = replicationSpec; - } - public String getAstRepresentationForErrorMsg() { return astRepresentationForErrorMsg; } - public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) { - this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + public MmContext getMmContext() { + return mmContext; } /** @@ -88,10 +106,10 @@ public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) * for more info. */ public void acidPostProcess(Hive db) throws HiveException { - if(qualifiedTableName != null) { - LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName + + if (acidFqTableName != null) { + LOG.info("Swapping export of " + tableSpec.tableName + " to " + acidFqTableName + " using partSpec=" + tableSpec.partSpec); - tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true); + tableSpec = new TableSpec(db, acidFqTableName, tableSpec.partSpec, true); } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 6a3be39ce4..c13c3f2ab0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,9 +17,24 @@ */ package org.apache.hadoop.hive.ql; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; @@ -47,13 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileOutputStream; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.TimeUnit; - /** * The LockManager is not ready, but for no-concurrency straight-line path we can * test AC=true, and AC=false with commit/rollback/exception and test resulting data. @@ -137,6 +145,106 @@ public void testSimpleAcidInsert() throws Exception { Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1); } + @Test + public void testMmExim() throws Exception { + String tableName = "mm_table", importName = tableName + "_import"; + runStatementOnDriver("drop table if exists " + tableName); + runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + + "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + tableName)); + + // Regular insert: export some MM deltas, then import into a new table. + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver(String.format("insert into %s (a,b) %s", + tableName, makeValuesClause(rows1))); + runStatementOnDriver(String.format("insert into %s (a,b) %s", + tableName, makeValuesClause(rows1))); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(hiveConf); + Path exportPath = new Path(table.getSd().getLocation() + "_export"); + fs.delete(exportPath, true); + runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); + List paths = listPathsRecursive(fs, exportPath); + verifyMmExportPaths(paths, 2); + runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); + org.apache.hadoop.hive.metastore.api.Table imported = msClient.getTable("default", importName); + Assert.assertEquals(imported.toString(), "insert_only", + imported.getParameters().get("transactional_properties")); + Path importPath = new Path(imported.getSd().getLocation()); + FileStatus[] stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter); + Assert.assertEquals(Arrays.toString(stat), 1, stat.length); + assertIsDelta(stat[0]); + List allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows1)); + allData.sort(null); + Collections.sort(allData); + List rs = runStatementOnDriver( + String.format("select a,b from %s order by a,b", importName)); + Assert.assertEquals("After import: " + rs, allData, rs); + runStatementOnDriver("drop table if exists " + importName); + + // Do insert overwrite to create some invalid deltas, and import into a non-MM table. + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver(String.format("insert overwrite table %s %s", + tableName, makeValuesClause(rows2))); + fs.delete(exportPath, true); + runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); + paths = listPathsRecursive(fs, exportPath); + verifyMmExportPaths(paths, 1); + runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + + "TBLPROPERTIES ('transactional'='false')", importName)); + runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); + imported = msClient.getTable("default", importName); + Assert.assertNull(imported.toString(), imported.getParameters().get("transactional")); + Assert.assertNull(imported.toString(), + imported.getParameters().get("transactional_properties")); + importPath = new Path(imported.getSd().getLocation()); + stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter); + allData = stringifyValues(rows2); + Collections.sort(allData); + rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", importName)); + Assert.assertEquals("After import: " + rs, allData, rs); + runStatementOnDriver("drop table if exists " + importName); + runStatementOnDriver("drop table if exists " + tableName); + msClient.close(); + } + + private void assertIsDelta(FileStatus stat) { + Assert.assertTrue(stat.toString(), + stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX)); + } + + private void verifyMmExportPaths(List paths, int deltasOrBases) { + // 1 file, 1 dir for each, for now. Plus export "data" dir. + // This could be changed to a flat file list later. + Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size()); + // No confusing directories in export. + for (String path : paths) { + Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX)); + Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX)); + } + } + + private List listPathsRecursive(FileSystem fs, Path path) throws IOException { + List paths = new ArrayList<>(); + LinkedList queue = new LinkedList<>(); + queue.add(path); + while (!queue.isEmpty()) { + Path next = queue.pollFirst(); + FileStatus[] stats = fs.listStatus(next, AcidUtils.hiddenFileFilter); + for (FileStatus stat : stats) { + Path child = stat.getPath(); + paths.add(child.toString()); + if (stat.isDirectory()) { + queue.add(child); + } + } + } + return paths; + } + + /** * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example) * @throws Exception diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java index 6daac1b789..4f1d38403d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -477,6 +477,7 @@ public void testMMCreateFlatSource() throws Exception { } private void testMM(boolean existingTable, boolean isSourceMM) throws Exception { HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + hiveConf.setBoolean("mapred.input.dir.recursive", true); int[][] data = {{1,2}, {3, 4}, {5, 6}}; runStatementOnDriver("drop table if exists T"); @@ -500,9 +501,10 @@ private void testMM(boolean existingTable, boolean isSourceMM) throws Exception //verify that we are indeed doing an Acid write (import) rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME"); Assert.assertEquals(3, rs.size()); - Assert.assertTrue(rs.get(0).endsWith("t/delta_0000001_0000001_0000/000000_0")); - Assert.assertTrue(rs.get(1).endsWith("t/delta_0000001_0000001_0000/000000_0")); - Assert.assertTrue(rs.get(2).endsWith("t/delta_0000001_0000001_0000/000000_0")); + for (String s : rs) { + Assert.assertTrue(s, s.contains("/delta_0000001_0000001_0000/")); + Assert.assertTrue(s, s.endsWith("/000000_0")); + } } private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ @@ -516,6 +518,7 @@ private void checkResult(String[][] expectedResult, String query, boolean isVect @Test public void testMMExportAborted() throws Exception { HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + hiveConf.setBoolean("mapred.input.dir.recursive", true); int[][] data = {{1, 2}, {3, 4}, {5, 6}}; int[][] dataAbort = {{10, 2}}; runStatementOnDriver("drop table if exists T"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index a2adb966fe..a88a570c52 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -207,7 +207,8 @@ void assertExpectedFileSet(Set expectedFiles, String rootPath) throws Ex void checkExpected(List rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) { LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); logResult(LOG, rs); - Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); + Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs, + expected.length, rs.size()); //verify data and layout for(int i = 0; i < expected.length; i++) { Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); diff --git ql/src/test/queries/clientpositive/mm_exim.q ql/src/test/queries/clientpositive/mm_exim.q index c47342bd23..a2b6e08603 100644 --- ql/src/test/queries/clientpositive/mm_exim.q +++ ql/src/test/queries/clientpositive/mm_exim.q @@ -59,13 +59,13 @@ drop table import1_mm; drop table import2_mm; import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart'; -desc import2_mm; +desc formatted import2_mm; select * from import2_mm order by key, p; drop table import2_mm; drop table import3_mm; import table import3_mm from 'ql/test/data/exports/intermmediate_part'; -desc import3_mm; +desc formatted import3_mm; select * from import3_mm order by key, p; drop table import3_mm; diff --git ql/src/test/results/clientpositive/llap/mm_exim.q.out ql/src/test/results/clientpositive/llap/mm_exim.q.out index 1f40754373..b277fed91d 100644 --- ql/src/test/results/clientpositive/llap/mm_exim.q.out +++ ql/src/test/results/clientpositive/llap/mm_exim.q.out @@ -292,14 +292,41 @@ POSTHOOK: type: IMPORT #### A masked pattern was here #### POSTHOOK: Output: database:default POSTHOOK: Output: default@import2_mm -PREHOOK: query: desc import2_mm +PREHOOK: query: desc formatted import2_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import2_mm -POSTHOOK: query: desc import2_mm +POSTHOOK: query: desc formatted import2_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import2_mm +# col_name data_type comment key int p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 3 + numRows 6 + rawDataSize 37 + totalSize 43 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import2_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import2_mm @@ -338,18 +365,45 @@ POSTHOOK: Output: default@import3_mm POSTHOOK: Output: default@import3_mm@p=455 POSTHOOK: Output: default@import3_mm@p=456 POSTHOOK: Output: default@import3_mm@p=457 -PREHOOK: query: desc import3_mm +PREHOOK: query: desc formatted import3_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import3_mm -POSTHOOK: query: desc import3_mm +POSTHOOK: query: desc formatted import3_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import3_mm +# col_name data_type comment key int -p int # Partition Information # col_name data_type comment p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 3 + numPartitions 3 + numRows 6 + rawDataSize 13 + totalSize 19 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import3_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import3_mm