diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index b0ec5abcce..1a8e5e79e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -63,14 +63,25 @@ public int execute(DriverContext driverContext) { protected int copyOnePath(Path fromPath, Path toPath) { FileSystem dstFs = null; try { - Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} " + fromPath); + Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} ", fromPath, toPath); console.printInfo("Copying data from " + fromPath.toString(), " to " + toPath.toString()); FileSystem srcFs = fromPath.getFileSystem(conf); dstFs = toPath.getFileSystem(conf); - FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs()); + FileStatus[] srcs = srcFs.globStatus(fromPath, new EximPathFilter()); + + // TODO: this is very brittle given that Hive supports nested directories in the tables. + // The caller should pass a flag explicitly telling us if the directories in the + // input are data, or parent of data. For now, retain this for backward compat. + if (srcs != null && srcs.length == 1 && srcs[0].isDirectory() + /*&& srcs[0].getPath().getName().equals(EximUtil.DATA_PATH_NAME) - still broken for partitions*/) { + Utilities.FILE_OP_LOGGER.debug( + "Recursing into a single child directory {}", srcs[0].getPath().getName()); + srcs = srcFs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + } + if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { console.printError("No files matching path: " + fromPath.toString()); @@ -107,40 +118,6 @@ protected int copyOnePath(Path fromPath, Path toPath) { } } - // Note: initially copied from LoadSemanticAnalyzer. - private static FileStatus[] matchFilesOrDir( - FileSystem fs, Path path, boolean isSourceMm) throws IOException { - if (!fs.exists(path)) return null; - if (!isSourceMm) return matchFilesOneDir(fs, path, null); - // Note: this doesn't handle list bucketing properly; neither does the original code. - FileStatus[] mmDirs = fs.listStatus(path, new AcidUtils.AnyIdDirFilter()); - if (mmDirs == null || mmDirs.length == 0) return null; - List allFiles = new ArrayList(); - for (FileStatus mmDir : mmDirs) { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Found source MM directory " + mmDir.getPath()); - } - matchFilesOneDir(fs, mmDir.getPath(), allFiles); - } - return allFiles.toArray(new FileStatus[allFiles.size()]); - } - - private static FileStatus[] matchFilesOneDir( - FileSystem fs, Path path, List result) throws IOException { - FileStatus[] srcs = fs.globStatus(path, new EximPathFilter()); - if (srcs != null && srcs.length == 1) { - if (srcs[0].isDirectory()) { - srcs = fs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - } - } - if (result != null && srcs != null) { - for (int i = 0; i < srcs.length; ++i) { - result.add(srcs[i]); - } - } - return srcs; - } - private static final class EximPathFilter implements PathFilter { @Override public boolean accept(Path p) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index aba65918f8..119d7923a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -47,15 +47,13 @@ public String getName() { protected int execute(DriverContext driverContext) { try { // Also creates the root directory - TableExport.Paths exportPaths = - new TableExport.Paths(work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), - conf, false); + TableExport.Paths exportPaths = new TableExport.Paths( + work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false); Hive db = getHive(); LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir()); work.acidPostProcess(db); - TableExport tableExport = new TableExport( - exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf - ); + TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), + work.getReplicationSpec(), db, null, conf, work.getMmContext()); if (!tableExport.write()) { throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 31846a38e0..406bea011d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.Expression; @@ -72,6 +73,7 @@ import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.codec.binary.Base64; @@ -136,6 +138,7 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 88d352b1d4..ccdf04aae7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -288,7 +289,10 @@ private void dumpTable(String dbName, String tblName, String validTxnList, Path if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) { tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList)); } - new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write(); + MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); + new TableExport( + exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf, mmCtx).write(); + replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } catch (InvalidTableException te) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index d3c62a2775..4a366a9360 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -19,12 +19,17 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Set; + +import javax.annotation.Nullable; + import org.antlr.runtime.tree.Tree; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -32,15 +37,14 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.plan.ExportWork; - -import javax.annotation.Nullable; -import java.util.Set; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; /** * ExportSemanticAnalyzer. * */ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { + private boolean isMmExport = false; ExportSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -48,7 +52,9 @@ @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs)); + Task task = analyzeExport(ast, null, db, conf, inputs, outputs); + isMmExport = task.getWork().getMmContext() != null; + rootTasks.add(task); } /** * @param acidTableName - table name in db.table format; not NULL if exporting Acid table @@ -80,12 +86,10 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { try { ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true); - } catch (SemanticException sme){ - if ((replicationSpec.isInReplicationScope()) && - ((sme.getCause() instanceof InvalidTableException) - || (sme instanceof Table.ValidationFailureSemanticException) - ) - ){ + } catch (SemanticException sme) { + if (!replicationSpec.isInReplicationScope()) throw sme; + if ((sme.getCause() instanceof InvalidTableException) + || (sme instanceof Table.ValidationFailureSemanticException)) { // If we're in replication scope, it's possible that we're running the export long after // the table was dropped, so the table not existing currently or being a different kind of // table is not an error - it simply means we should no-op, and let a future export @@ -101,15 +105,26 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // All parsing is done, we're now good to start the export process TableExport.Paths exportPaths = new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false); - TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf); - TableExport.AuthEntities authEntities = tableExport.getAuthEntities(); + // Note: this tableExport is actually never used other than for auth, and another one is + // created when the task is executed. So, we don't care about the correct MM state here. + TableExport.AuthEntities authEntities = new TableExport( + exportPaths, ts, replicationSpec, db, null, conf, null).getAuthEntities(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); String exportRootDirName = tmpPath; + MmContext mmCtx = MmContext.createIfNeeded(ts == null ? null : ts.tableHandle); + + Utilities.FILE_OP_LOGGER.debug("Exporting table {}: MM context {}", + ts == null ? null : ts.tableName, mmCtx); // Configure export work - ExportWork exportWork = - new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName); + ExportWork exportWork = new ExportWork(exportRootDirName, ts, replicationSpec, + ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName, mmCtx); // Create an export task and add it as a root task - return TaskFactory.get(exportWork); + return TaskFactory.get(exportWork); + } + + @Override + public boolean hasTransactionalInQuery() { + return isMmExport; // Full ACID export goes thru UpdateDelete analyzer. } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index e6a70121f0..e597872e86 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -382,7 +382,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long writeId, int stmtId, boolean isSourceMm) { + Long writeId, int stmtId) { assert table != null; assert table.getParameters() != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); @@ -423,9 +423,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if (replicationSpec.isInReplicationScope()) { copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(dataPath, destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } LoadTableDesc loadTableWork = new LoadTableDesc( @@ -480,7 +478,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, - EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm) + EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); if (tblDesc.isExternal() && tblDesc.getLocation() == null) { @@ -517,9 +515,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, copyTask = ReplCopyTask.getLoadCopyTask( replicationSpec, new Path(srcLocation), destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } Task addPartTask = TaskFactory.get( @@ -830,8 +826,6 @@ private static void createRegularImportTasks( EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws HiveException, IOException, MetaException { - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); - if (table != null) { if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); @@ -841,7 +835,7 @@ private static void createRegularImportTasks( org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); @@ -853,8 +847,7 @@ private static void createRegularImportTasks( Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); - loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, - isSourceMm); + loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId); } // Set this to read because we can't overwrite any existing partitions x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -873,7 +866,7 @@ private static void createRegularImportTasks( if (isPartitioned(tblDesc)) { for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, - replicationSpec, x, writeId, stmtId, isSourceMm)); + replicationSpec, x, writeId, stmtId)); } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); @@ -891,7 +884,7 @@ private static void createRegularImportTasks( FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG()); t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, - writeId, stmtId, isSourceMm)); + writeId, stmtId)); } } x.getTasks().add(t); @@ -923,7 +916,6 @@ private static void createReplImportTasks( throws HiveException, URISyntaxException, IOException, MetaException { Task dropTblTask = null; - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; // Normally, on import, trying to create a table or a partition in a db that does not yet exist @@ -1007,14 +999,14 @@ private static void createReplImportTasks( for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId)); } } @@ -1037,7 +1029,7 @@ private static void createReplImportTasks( if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } @@ -1054,7 +1046,7 @@ private static void createReplImportTasks( if (replicationSpec.allowReplacementInto(ptn.getParameters())){ if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); } else { x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); @@ -1080,7 +1072,7 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into loadTable(fromURI, table, replicationSpec.isReplace(), table.getDataLocation(), - replicationSpec, x, writeId, stmtId, isSourceMm); + replicationSpec, x, writeId, stmtId); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 820046388a..70295da960 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -215,7 +215,7 @@ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode t case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: - if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { + if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { return new UpdateDeleteSemanticAnalyzer(queryState); } return new ExportSemanticAnalyzer(queryState); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 70eb750338..d73fc4f336 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,8 @@ * it has a blocking queue that stores partitions to be dumped via a producer thread. * it has a worker thread pool that reads of the queue to perform the various tasks. */ +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? class PartitionExport { private final Paths paths; private final PartitionIterable partitionIterable; @@ -50,16 +53,18 @@ private final HiveConf hiveConf; private final int nThreads; private final SessionState callersSession; + private final MmContext mmCtx; private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue queue; PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser, - HiveConf hiveConf) { + HiveConf hiveConf, MmContext mmCtx) { this.paths = paths; this.partitionIterable = partitionIterable; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); this.queue = new ArrayBlockingQueue<>(2 * nThreads); this.callersSession = SessionState.get(); @@ -106,7 +111,7 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException List dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionExportDir(partitionName); - new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf) + new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) .export(forReplicationSpec); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index d0aeee5030..2a3986a317 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,8 @@ import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity; +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class TableExport { private static final Logger logger = LoggerFactory.getLogger(TableExport.class); @@ -59,9 +62,10 @@ private final String distCpDoAsUser; private final HiveConf conf; private final Paths paths; + private final MmContext mmCtx; public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, - String distCpDoAsUser, HiveConf conf) { + String distCpDoAsUser, HiveConf conf, MmContext mmCtx) { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) @@ -76,6 +80,7 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.distCpDoAsUser = distCpDoAsUser; this.conf = conf; this.paths = paths; + this.mmCtx = mmCtx; } public boolean write() throws SemanticException { @@ -147,13 +152,13 @@ private void writeData(PartitionIterable partitions) throws SemanticException { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.tableName); } - new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec); + new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); // this is the data copy - new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf) + new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) .export(replicationSpec); } } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 690498ff6b..b61a945d94 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -20,24 +20,31 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; + import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; +//TODO: this object is created once to call one method and then immediately destroyed. +//So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class FileOperations { private static Logger logger = LoggerFactory.getLogger(FileOperations.class); private final List dataPathList; @@ -45,13 +52,15 @@ private final String distCpDoAsUser; private HiveConf hiveConf; private final FileSystem dataFileSystem, exportFileSystem; + private final MmContext mmCtx; - public FileOperations(List dataPathList, Path exportRootDataDir, - String distCpDoAsUser, HiveConf hiveConf) throws IOException { + public FileOperations(List dataPathList, Path exportRootDataDir, String distCpDoAsUser, + HiveConf hiveConf, MmContext mmCtx) throws IOException { this.dataPathList = dataPathList; this.exportRootDataDir = exportRootDataDir; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; if ((dataPathList != null) && !dataPathList.isEmpty()) { dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); } else { @@ -72,17 +81,59 @@ public void export(ReplicationSpec forReplicationSpec) throws Exception { * This writes the actual data in the exportRootDataDir from the source. */ private void copyFiles() throws IOException, LoginException { - for (Path dataPath : dataPathList) { - FileStatus[] fileStatuses = - LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataPath); - List srcPaths = new ArrayList<>(); - for (FileStatus fileStatus : fileStatuses) { - srcPaths.add(fileStatus.getPath()); + if (mmCtx == null) { + for (Path dataPath : dataPathList) { + copyOneDataPath(dataPath, exportRootDataDir); } - new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths); + } else { + copyMmPath(); + } + } + + private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { + FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath); + List srcPaths = new ArrayList<>(); + for (FileStatus fileStatus : fileStatuses) { + srcPaths.add(fileStatus.getPath()); } + + new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths); } + private void copyMmPath() throws LoginException, IOException { + assert dataPathList.size() == 1; + ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName()); + Path fromPath = dataFileSystem.makeQualified(dataPathList.get(0)); + List validPaths = getMmValidPaths(ids, fromPath); + String fromPathStr = fromPath.toString(); + if (!fromPathStr.endsWith(Path.SEPARATOR)) { + fromPathStr += Path.SEPARATOR; + } + for (Path validPath : validPaths) { + // Export valid directories with a modified name so they don't look like bases/deltas. + // We could also dump the delta contents all together and rename the files if names collide. + String mmChildPath = "export_old_" + validPath.toString().substring(fromPathStr.length()); + Path destPath = new Path(exportRootDataDir, mmChildPath); + exportFileSystem.mkdirs(destPath); + copyOneDataPath(validPath, destPath); + } + } + + private List getMmValidPaths(ValidWriteIdList ids, Path fromPath) throws IOException { + Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", fromPath); + AcidUtils.Directory acidState = AcidUtils.getAcidState(fromPath, hiveConf, ids); + List validPaths = new ArrayList<>(); + Path base = acidState.getBaseDirectory(); + if (base != null) { + validPaths.add(base); + } + for (ParsedDelta pd : acidState.getCurrentDirectories()) { + validPaths.add(pd.getPath()); + } + return validPaths; + } + + /** * This needs the root data directory to which the data needs to be exported to. * The data export here is a list of files either in table/partition that are written to the _files @@ -90,8 +141,19 @@ private void copyFiles() throws IOException, LoginException { */ private void exportFilesAsList() throws SemanticException, IOException { try (BufferedWriter writer = writer()) { - for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + if (mmCtx != null) { + assert dataPathList.size() == 1; + Path dataPath = dataPathList.get(0); + ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList( + hiveConf, mmCtx.getFqTableName()); + List validPaths = getMmValidPaths(ids, dataPath); + for (Path mmPath : validPaths) { + writeFilesList(listFilesInDir(mmPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } + } else { + for (Path dataPath : dataPathList) { + writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index c0e4a43d9c..018983f6dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -33,15 +33,10 @@ private Path[] fromPath; private Path[] toPath; private boolean errorOnSrcEmpty; - private boolean isSkipMmDirs = false; public CopyWork() { } - public CopyWork(final Path fromPath, final Path toPath) { - this(fromPath, toPath, true); - } - public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) { this(new Path[] { fromPath }, new Path[] { toPath }); this.setErrorOnSrcEmpty(errorOnSrcEmpty); @@ -92,17 +87,4 @@ public void setErrorOnSrcEmpty(boolean errorOnSrcEmpty) { public boolean isErrorOnSrcEmpty() { return errorOnSrcEmpty; } - - /** - * Whether the copy should ignore MM directories in the source, and copy their content to - * destination directly, rather than copying the directories themselves. - * */ - public void setSkipSourceMmDirs(boolean isMm) { - this.isSkipMmDirs = isMm; - } - - public boolean doSkipSourceMmDirs() { - return isSkipMmDirs ; - } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index 72ce79836c..d91569ec6d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -17,39 +17,65 @@ */ package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; - @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ExportWork implements Serializable { - private Logger LOG = LoggerFactory.getLogger(ExportWork.class); + private static Logger LOG = LoggerFactory.getLogger(ExportWork.class); private static final long serialVersionUID = 1L; + public final static class MmContext { + private final String fqTableName; + + private MmContext(String fqTableName) { + this.fqTableName = fqTableName; + } + + @Override + public String toString() { + return "[" + fqTableName + "]"; + } + + public static MmContext createIfNeeded(Table t) { + if (t == null) return null; + if (!AcidUtils.isInsertOnlyTable(t.getParameters())) return null; + return new MmContext(AcidUtils.getFullTableName(t.getDbName(), t.getTableName())); + } + + public String getFqTableName() { + return fqTableName; + } + } + private final String exportRootDirName; private TableSpec tableSpec; private ReplicationSpec replicationSpec; private String astRepresentationForErrorMsg; - private String qualifiedTableName; + private String acidFqTableName; + private final MmContext mmContext; /** - * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise + * @param acidFqTableName if exporting Acid table, this is temp table - null otherwise */ public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, - String astRepresentationForErrorMsg, String qualifiedTable) { + String astRepresentationForErrorMsg, String acidFqTableName, MmContext mmContext) { this.exportRootDirName = exportRootDirName; this.tableSpec = tableSpec; this.replicationSpec = replicationSpec; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; - this.qualifiedTableName = qualifiedTable; + this.mmContext = mmContext; + this.acidFqTableName = acidFqTableName; } public String getExportRootDir() { @@ -60,24 +86,16 @@ public TableSpec getTableSpec() { return tableSpec; } - public void setTableSpec(TableSpec tableSpec) { - this.tableSpec = tableSpec; - } - public ReplicationSpec getReplicationSpec() { return replicationSpec; } - public void setReplicationSpec(ReplicationSpec replicationSpec) { - this.replicationSpec = replicationSpec; - } - public String getAstRepresentationForErrorMsg() { return astRepresentationForErrorMsg; } - public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) { - this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + public MmContext getMmContext() { + return mmContext; } /** @@ -88,10 +106,10 @@ public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) * for more info. */ public void acidPostProcess(Hive db) throws HiveException { - if(qualifiedTableName != null) { - LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName + + if (acidFqTableName != null) { + LOG.info("Swapping export of " + tableSpec.tableName + " to " + acidFqTableName + " using partSpec=" + tableSpec.partSpec); - tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true); + tableSpec = new TableSpec(db, acidFqTableName, tableSpec.partSpec, true); } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 6faba42e23..3e2784ba2d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,9 +17,24 @@ */ package org.apache.hadoop.hive.ql; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; @@ -47,13 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileOutputStream; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.TimeUnit; - /** * The LockManager is not ready, but for no-concurrency straight-line path we can * test AC=true, and AC=false with commit/rollback/exception and test resulting data. @@ -152,6 +160,106 @@ public void testSimpleAcidInsert() throws Exception { Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1); } + @Test + public void testMmExim() throws Exception { + String tableName = "mm_table", importName = tableName + "_import"; + runStatementOnDriver("drop table if exists " + tableName); + runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + + "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + tableName)); + + // Regular insert: export some MM deltas, then import into a new table. + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver(String.format("insert into %s (a,b) %s", + tableName, makeValuesClause(rows1))); + runStatementOnDriver(String.format("insert into %s (a,b) %s", + tableName, makeValuesClause(rows1))); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(hiveConf); + Path exportPath = new Path(table.getSd().getLocation() + "_export"); + fs.delete(exportPath, true); + runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); + List paths = listPathsRecursive(fs, exportPath); + verifyMmExportPaths(paths, 2); + runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); + org.apache.hadoop.hive.metastore.api.Table imported = msClient.getTable("default", importName); + Assert.assertEquals(imported.toString(), "insert_only", + imported.getParameters().get("transactional_properties")); + Path importPath = new Path(imported.getSd().getLocation()); + FileStatus[] stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter); + Assert.assertEquals(Arrays.toString(stat), 1, stat.length); + assertIsDelta(stat[0]); + List allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows1)); + allData.sort(null); + Collections.sort(allData); + List rs = runStatementOnDriver( + String.format("select a,b from %s order by a,b", importName)); + Assert.assertEquals("After import: " + rs, allData, rs); + runStatementOnDriver("drop table if exists " + importName); + + // Do insert overwrite to create some invalid deltas, and import into a non-MM table. + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver(String.format("insert overwrite table %s %s", + tableName, makeValuesClause(rows2))); + fs.delete(exportPath, true); + runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); + paths = listPathsRecursive(fs, exportPath); + verifyMmExportPaths(paths, 1); + runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + + "TBLPROPERTIES ('transactional'='false')", importName)); + runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); + imported = msClient.getTable("default", importName); + Assert.assertNull(imported.toString(), imported.getParameters().get("transactional")); + Assert.assertNull(imported.toString(), + imported.getParameters().get("transactional_properties")); + importPath = new Path(imported.getSd().getLocation()); + stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter); + allData = stringifyValues(rows2); + Collections.sort(allData); + rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", importName)); + Assert.assertEquals("After import: " + rs, allData, rs); + runStatementOnDriver("drop table if exists " + importName); + runStatementOnDriver("drop table if exists " + tableName); + msClient.close(); + } + + private void assertIsDelta(FileStatus stat) { + Assert.assertTrue(stat.toString(), + stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX)); + } + + private void verifyMmExportPaths(List paths, int deltasOrBases) { + // 1 file, 1 dir for each, for now. Plus export "data" dir. + // This could be changed to a flat file list later. + Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size()); + // No confusing directories in export. + for (String path : paths) { + Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX)); + Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX)); + } + } + + private List listPathsRecursive(FileSystem fs, Path path) throws IOException { + List paths = new ArrayList<>(); + LinkedList queue = new LinkedList<>(); + queue.add(path); + while (!queue.isEmpty()) { + Path next = queue.pollFirst(); + FileStatus[] stats = fs.listStatus(next, AcidUtils.hiddenFileFilter); + for (FileStatus stat : stats) { + Path child = stat.getPath(); + paths.add(child.toString()); + if (stat.isDirectory()) { + queue.add(child); + } + } + } + return paths; + } + + /** * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example) * @throws Exception diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java index 6daac1b789..4f1d38403d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -477,6 +477,7 @@ public void testMMCreateFlatSource() throws Exception { } private void testMM(boolean existingTable, boolean isSourceMM) throws Exception { HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + hiveConf.setBoolean("mapred.input.dir.recursive", true); int[][] data = {{1,2}, {3, 4}, {5, 6}}; runStatementOnDriver("drop table if exists T"); @@ -500,9 +501,10 @@ private void testMM(boolean existingTable, boolean isSourceMM) throws Exception //verify that we are indeed doing an Acid write (import) rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME"); Assert.assertEquals(3, rs.size()); - Assert.assertTrue(rs.get(0).endsWith("t/delta_0000001_0000001_0000/000000_0")); - Assert.assertTrue(rs.get(1).endsWith("t/delta_0000001_0000001_0000/000000_0")); - Assert.assertTrue(rs.get(2).endsWith("t/delta_0000001_0000001_0000/000000_0")); + for (String s : rs) { + Assert.assertTrue(s, s.contains("/delta_0000001_0000001_0000/")); + Assert.assertTrue(s, s.endsWith("/000000_0")); + } } private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ @@ -516,6 +518,7 @@ private void checkResult(String[][] expectedResult, String query, boolean isVect @Test public void testMMExportAborted() throws Exception { HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + hiveConf.setBoolean("mapred.input.dir.recursive", true); int[][] data = {{1, 2}, {3, 4}, {5, 6}}; int[][] dataAbort = {{10, 2}}; runStatementOnDriver("drop table if exists T"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index a2adb966fe..a88a570c52 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -207,7 +207,8 @@ void assertExpectedFileSet(Set expectedFiles, String rootPath) throws Ex void checkExpected(List rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) { LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); logResult(LOG, rs); - Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); + Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs, + expected.length, rs.size()); //verify data and layout for(int i = 0; i < expected.length; i++) { Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); diff --git ql/src/test/queries/clientpositive/mm_exim.q ql/src/test/queries/clientpositive/mm_exim.q index c47342bd23..a2b6e08603 100644 --- ql/src/test/queries/clientpositive/mm_exim.q +++ ql/src/test/queries/clientpositive/mm_exim.q @@ -59,13 +59,13 @@ drop table import1_mm; drop table import2_mm; import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart'; -desc import2_mm; +desc formatted import2_mm; select * from import2_mm order by key, p; drop table import2_mm; drop table import3_mm; import table import3_mm from 'ql/test/data/exports/intermmediate_part'; -desc import3_mm; +desc formatted import3_mm; select * from import3_mm order by key, p; drop table import3_mm; diff --git ql/src/test/results/clientpositive/llap/mm_exim.q.out ql/src/test/results/clientpositive/llap/mm_exim.q.out index 1f40754373..b277fed91d 100644 --- ql/src/test/results/clientpositive/llap/mm_exim.q.out +++ ql/src/test/results/clientpositive/llap/mm_exim.q.out @@ -292,14 +292,41 @@ POSTHOOK: type: IMPORT #### A masked pattern was here #### POSTHOOK: Output: database:default POSTHOOK: Output: default@import2_mm -PREHOOK: query: desc import2_mm +PREHOOK: query: desc formatted import2_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import2_mm -POSTHOOK: query: desc import2_mm +POSTHOOK: query: desc formatted import2_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import2_mm +# col_name data_type comment key int p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 3 + numRows 6 + rawDataSize 37 + totalSize 43 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import2_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import2_mm @@ -338,18 +365,45 @@ POSTHOOK: Output: default@import3_mm POSTHOOK: Output: default@import3_mm@p=455 POSTHOOK: Output: default@import3_mm@p=456 POSTHOOK: Output: default@import3_mm@p=457 -PREHOOK: query: desc import3_mm +PREHOOK: query: desc formatted import3_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import3_mm -POSTHOOK: query: desc import3_mm +POSTHOOK: query: desc formatted import3_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import3_mm +# col_name data_type comment key int -p int # Partition Information # col_name data_type comment p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + numFiles 3 + numPartitions 3 + numRows 6 + rawDataSize 13 + totalSize 19 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import3_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import3_mm