diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index c084fa054c..74a9d727cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -31,7 +31,9 @@ import java.util.Map; import java.util.Properties; import java.util.Set; + import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -162,8 +164,10 @@ } public class FSPaths implements Cloneable { - private Path tmpPath; - private Path taskOutputTempPath; + private Path tmpPathRoot; + private String subdirBeforeTxn, subdirAfterTxn; + private final String subdirForTxn; + private Path taskOutputTempPathRoot; Path[] outPaths; Path[] finalPaths; RecordWriter[] outWriters; @@ -172,25 +176,23 @@ int acidLastBucket = -1; int acidFileOffset = -1; private boolean isMmTable; - private Long writeId; - private int stmtId; - String dpDir; + String dpDirForCounters; public FSPaths(Path specPath, boolean isMmTable) { this.isMmTable = isMmTable; if (!isMmTable) { - tmpPath = Utilities.toTempPath(specPath); - taskOutputTempPath = Utilities.toTaskTempPath(specPath); + tmpPathRoot = Utilities.toTempPath(specPath); + taskOutputTempPathRoot = Utilities.toTaskTempPath(specPath); + subdirForTxn = null; } else { - tmpPath = specPath; - taskOutputTempPath = null; // Should not be used. - writeId = conf.getTableWriteId(); - stmtId = conf.getStatementId(); + tmpPathRoot = specPath; + taskOutputTempPathRoot = null; // Should not be used. + subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), + conf.getTableWriteId(), conf.getTableWriteId(), conf.getStatementId()); } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts - + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath - + " (spec path " + specPath + ")"/*, new Exception()*/); + Utilities.FILE_OP_LOGGER.trace("new FSPaths for " + numFiles + + " files, dynParts = " + bDynParts + " (spec path " + specPath + ")"); } outPaths = new Path[numFiles]; @@ -203,24 +205,6 @@ public FSPaths(Path specPath, boolean isMmTable) { stat = new Stat(); } - /** - * Update OutPath according to tmpPath. - */ - public Path getTaskOutPath(String taskId) { - return new Path(this.taskOutputTempPath, Utilities.toTempPath(taskId)); - } - - /** - * Update the final paths according to tmpPath. - */ - private Path getFinalPath(String taskId, Path tmpPath, String extension) { - if (extension != null) { - return new Path(tmpPath, taskId + extension); - } else { - return new Path(tmpPath, taskId); - } - } - public void closeWriters(boolean abort) throws HiveException { for (int idx = 0; idx < outWriters.length; idx++) { if (outWriters[idx] != null) { @@ -307,45 +291,31 @@ public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws Hi } } - public void configureDynPartPath(String dirName, String childSpecPathDynLinkedPartitions) { - dirName = (childSpecPathDynLinkedPartitions == null) ? dirName : - dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions; - tmpPath = new Path(tmpPath, dirName); - if (taskOutputTempPath != null) { - taskOutputTempPath = new Path(taskOutputTempPath, dirName); - } - } - public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeTable, boolean isSkewedStoredAsSubDirectories) { if (isNativeTable) { String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat); + String taskWithExt = extension == null ? taskId : taskId + extension; if (!isMmTable) { if (!bDynParts && !isSkewedStoredAsSubDirectories) { - finalPaths[filesIdx] = getFinalPath(taskId, parent, extension); + finalPaths[filesIdx] = new Path(parent, taskWithExt); } else { - finalPaths[filesIdx] = getFinalPath(taskId, tmpPath, extension); + finalPaths[filesIdx] = new Path(buildTmpPath(), taskWithExt); } - outPaths[filesIdx] = getTaskOutPath(taskId); + outPaths[filesIdx] = new Path(buildTaskOutputTempPath(), Utilities.toTempPath(taskId)); } else { - String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), writeId, writeId, stmtId); - if (unionPath != null) { - // Create the union directory inside the MM directory. - subdirPath += Path.SEPARATOR + unionPath; - } - subdirPath += Path.SEPARATOR + taskId; + String taskIdPath = taskId; if (conf.isMerge()) { // Make sure we don't collide with the source files. // MM tables don't support concat so we don't expect the merge of merged files. - subdirPath += ".merged"; + taskIdPath += ".merged"; } - Path finalPath = null; - if (!bDynParts && !isSkewedStoredAsSubDirectories) { - finalPath = getFinalPath(subdirPath, specPath, extension); - } else { - // Note: tmpPath here has the correct partition key - finalPath = getFinalPath(subdirPath, tmpPath, extension); + if (extension != null) { + taskIdPath += extension; } + + Path finalPath = new Path(buildTmpPath(), taskIdPath); + // In the cases that have multi-stage insert, e.g. a "hive.skewjoin.key"-based skew join, // it can happen that we want multiple commits into the same directory from different // tasks (not just task instances). In non-MM case, Utilities.renameOrMoveFiles ensures @@ -373,12 +343,31 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } } - public Path getTmpPath() { - return tmpPath; + public Path buildTmpPath() { + String pathStr = tmpPathRoot.toString(); + if (subdirBeforeTxn != null) { + pathStr += Path.SEPARATOR + subdirBeforeTxn; + } + if (subdirForTxn != null) { + pathStr += Path.SEPARATOR + subdirForTxn; + } + if (subdirAfterTxn != null) { + pathStr += Path.SEPARATOR + subdirAfterTxn; + } + return new Path(pathStr); } - public Path getTaskOutputTempPath() { - return taskOutputTempPath; + public Path buildTaskOutputTempPath() { + if (taskOutputTempPathRoot == null) return null; + assert subdirForTxn == null; + String pathStr = taskOutputTempPathRoot.toString(); + if (subdirBeforeTxn != null) { + pathStr += Path.SEPARATOR + subdirBeforeTxn; + } + if (subdirAfterTxn != null) { + pathStr += Path.SEPARATOR + subdirAfterTxn; + } + return new Path(pathStr); } public void addToStat(String statType, long amount) { @@ -440,7 +429,7 @@ private void initializeSpecPath() { // 'Parent' boolean isLinked = conf.isLinkedFileSink(); if (!isLinked) { - // Simple case - no union. + // Simple case - no union. specPath = conf.getDirName(); unionPath = null; } else { @@ -543,10 +532,11 @@ protected void initializeOp(Configuration hconf) throws HiveException { if (!bDynParts) { fsp = new FSPaths(specPath, conf.isMmTable()); + fsp.subdirAfterTxn = combinePathFragments(generateListBucketingDirName(null), unionPath); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("creating new paths " + System.identityHashCode(fsp) - + " from ctor; childSpec " + unionPath + ": tmpPath " + fsp.getTmpPath() - + ", task path " + fsp.getTaskOutputTempPath()); + + " from ctor; childSpec " + unionPath + ": tmpPath " + fsp.buildTmpPath() + + ", task path " + fsp.buildTaskOutputTempPath()); } // Create all the files - this is required because empty files need to be created for @@ -562,7 +552,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { if (isTemporary && fsp != null && tmpStorage != StoragePolicyValue.DEFAULT) { assert !conf.isMmTable(); // Not supported for temp tables. - final Path outputPath = fsp.taskOutputTempPath; + final Path outputPath = fsp.buildTaskOutputTempPath(); StoragePolicyShim shim = ShimLoader.getHadoopShims() .getStoragePolicyShim(fs); if (shim != null) { @@ -723,7 +713,7 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx] + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path " - + fsp.getTmpPath() + ", task " + taskId + ")"); + + fsp.buildTmpPath() + ", task " + taskId + ")"); } if (LOG.isInfoEnabled()) { LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); @@ -777,8 +767,8 @@ private void updateDPCounters(final FSPaths fsp, final int filesIdx) { // 1) Insert overwrite (all partitions are newly created) // 2) Insert into table which creates new partitions (some new partitions) - if (bDynParts && destTablePath != null && fsp.dpDir != null) { - Path destPartPath = new Path(destTablePath, fsp.dpDir); + if (bDynParts && destTablePath != null && fsp.dpDirForCounters != null) { + Path destPartPath = new Path(destTablePath, fsp.dpDirForCounters); // For MM tables, directory structure is // /// @@ -861,7 +851,9 @@ public void process(Object row, int tag) throws HiveException { if (!bDynParts && !filesCreated) { if (lbDirName != null) { - FSPaths fsp2 = lookupListBucketingPaths(lbDirName); + if (valToPaths.get(lbDirName) == null) { + createNewPaths(null, lbDirName); + } } else { createBucketFiles(fsp); } @@ -910,7 +902,10 @@ public void process(Object row, int tag) throws HiveException { recordValue = serializer.serialize(row, subSetOI); } else { if (lbDirName != null) { - fpaths = lookupListBucketingPaths(lbDirName); + fpaths = valToPaths.get(lbDirName); + if (fpaths == null) { + fpaths = createNewPaths(null, lbDirName); + } } else { fpaths = fsp; } @@ -1058,44 +1053,37 @@ assert getConf().getWriteType() != AcidUtils.Operation.DELETE && } /** - * Lookup list bucketing path. - * @param lbDirName - * @return - * @throws HiveException - */ - protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException { - FSPaths fsp2 = valToPaths.get(lbDirName); - if (fsp2 == null) { - fsp2 = createNewPaths(lbDirName); - } - return fsp2; - } - - /** * create new path. * * @param dirName * @return * @throws HiveException */ - private FSPaths createNewPaths(String dirName) throws HiveException { + private FSPaths createNewPaths(String dpDir, String lbDir) throws HiveException { FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); - fsp2.configureDynPartPath(dirName, !conf.isMmTable() && isUnionDp ? unionPath : null); + fsp2.subdirAfterTxn = combinePathFragments(lbDir, unionPath); + fsp2.subdirBeforeTxn = dpDir; + String pathKey = combinePathFragments(dpDir, lbDir); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("creating new paths " + System.identityHashCode(fsp2) + " for " - + dirName + ", childSpec " + unionPath + ": tmpPath " + fsp2.getTmpPath() - + ", task path " + fsp2.getTaskOutputTempPath()); + Utilities.FILE_OP_LOGGER.trace("creating new paths {} for {}, childSpec {}: tmpPath {}," + + " task path {}", System.identityHashCode(fsp2), pathKey, unionPath, + fsp2.buildTmpPath(), fsp2.buildTaskOutputTempPath()); } + if (bDynParts) { - fsp2.dpDir = dirName; + fsp2.dpDirForCounters = pathKey; } if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { createBucketFiles(fsp2); - valToPaths.put(dirName, fsp2); + valToPaths.put(pathKey, fsp2); } return fsp2; } + private static String combinePathFragments(String first, String second) { + return first == null ? second : (second == null ? first : first + Path.SEPARATOR + second); + } + /** * Generate list bucketing directory name from a row. * @param row row to process. @@ -1107,42 +1095,52 @@ protected String generateListBucketingDirName(Object row) { } String lbDirName = null; - List standObjs = new ArrayList(); List skewedCols = lbCtx.getSkewedColNames(); List> allSkewedVals = lbCtx.getSkewedColValues(); - List skewedValsCandidate = null; Map, String> locationMap = lbCtx.getLbLocationMap(); - /* Convert input row to standard objects. */ - ObjectInspectorUtils.copyToStandardObject(standObjs, row, - (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); + if (row != null) { + List standObjs = new ArrayList(); + List skewedValsCandidate = null; + /* Convert input row to standard objects. */ + ObjectInspectorUtils.copyToStandardObject(standObjs, row, + (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); - assert (standObjs.size() >= skewedCols.size()) : - "The row has less number of columns than no. of skewed column."; + assert (standObjs.size() >= skewedCols.size()) : + "The row has less number of columns than no. of skewed column."; - skewedValsCandidate = new ArrayList(skewedCols.size()); - for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) { - skewedValsCandidate.add(posPair.getSkewColPosition(), - standObjs.get(posPair.getTblColPosition()).toString()); - } - /* The row matches skewed column names. */ - if (allSkewedVals.contains(skewedValsCandidate)) { - /* matches skewed values. */ - lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate); - locationMap.put(skewedValsCandidate, lbDirName); + skewedValsCandidate = new ArrayList(skewedCols.size()); + for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) { + skewedValsCandidate.add(posPair.getSkewColPosition(), + standObjs.get(posPair.getTblColPosition()).toString()); + } + /* The row matches skewed column names. */ + if (allSkewedVals.contains(skewedValsCandidate)) { + /* matches skewed values. */ + lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate); + locationMap.put(skewedValsCandidate, lbDirName); + } else { + lbDirName = createDefaultLbDir(skewedCols, locationMap); + } } else { - /* create default directory. */ - lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols, + lbDirName = createDefaultLbDir(skewedCols, locationMap); + } + return lbDirName; + } + + private String createDefaultLbDir(List skewedCols, + Map, String> locationMap) { + String lbDirName; + lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols, lbCtx.getDefaultDirName()); - List defaultKey = Lists.newArrayList(lbCtx.getDefaultKey()); - if (!locationMap.containsKey(defaultKey)) { - locationMap.put(defaultKey, lbDirName); - } + List defaultKey = Lists.newArrayList(lbCtx.getDefaultKey()); + if (!locationMap.containsKey(defaultKey)) { + locationMap.put(defaultKey, lbDirName); } return lbDirName; } - protected FSPaths getDynOutPaths(List row, String lbDirName) throws HiveException { + protected FSPaths getDynOutPaths(List row, String lbDir) throws HiveException { FSPaths fp; @@ -1151,12 +1149,12 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive String pathKey = null; if (dpDir != null) { - dpDir = appendToSource(lbDirName, dpDir); - pathKey = dpDir; - if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { + String dpAndLbDir = combinePathFragments(dpDir, lbDir); + pathKey = dpAndLbDir; + if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { String buckNum = row.get(row.size() - 1); taskId = Utilities.replaceTaskIdFromFilename(taskId, buckNum); - pathKey = appendToSource(taskId, dpDir); + pathKey = dpAndLbDir + Path.SEPARATOR + taskId; } FSPaths fsp2 = valToPaths.get(pathKey); @@ -1200,7 +1198,7 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive prevFsp = null; } - fsp2 = createNewPaths(dpDir); + fsp2 = createNewPaths(dpDir, lbDir); if (prevFsp == null) { prevFsp = fsp2; } @@ -1218,19 +1216,6 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive return fp; } - /** - * Append dir to source dir - * @param appendDir - * @param srcDir - * @return - */ - private String appendToSource(String appendDir, String srcDir) { - StringBuilder builder = new StringBuilder(srcDir); - srcDir = (appendDir == null) ? srcDir : builder.append(Path.SEPARATOR).append(appendDir) - .toString(); - return srcDir; - } - // given the current input row, the mapping for input col info to dp columns, and # of dp cols, // return the relative path corresponding to the row. // e.g., ds=2008-04-08/hr=11 @@ -1457,7 +1442,7 @@ private void publishStats() throws HiveException { FSPaths fspValue = entry.getValue(); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Observing entry for stats " + fspKey - + " => FSP with tmpPath " + fspValue.getTmpPath()); + + " => FSP with tmpPath " + fspValue.buildTmpPath()); } // for bucketed tables, hive.optimize.sort.dynamic.partition optimization // adds the taskId to the fspKey. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 25035433c7..4267c130f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4072,9 +4072,9 @@ private static void tryDelete(FileSystem fs, Path path) { } public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, - int lbLevels, PathFilter filter, long writeId, int stmtId, Configuration conf, + PathFilter filter, long writeId, int stmtId, Configuration conf, Boolean isBaseDir) throws IOException { - int skipLevels = dpLevels + lbLevels; + int skipLevels = dpLevels; if (filter == null) { filter = new AcidUtils.IdPathFilter(writeId, stmtId); } @@ -4186,7 +4186,7 @@ private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manif int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, int stmtId, Configuration conf) throws IOException { Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, writeId, stmtId, conf, null); + fs, specPath, dpLevels, filter, writeId, stmtId, conf, null); if (files != null) { for (Path path : files) { Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", path); @@ -4281,7 +4281,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con FileUtils.mkdir(fs, specPath, hconf); } Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, writeId, stmtId, hconf, isInsertOverwrite); + fs, specPath, dpLevels, filter, writeId, stmtId, hconf, isInsertOverwrite); ArrayList mmDirectories = new ArrayList<>(); if (files != null) { for (Path path : files) { @@ -4296,6 +4296,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con int fileCount = mdis.readInt(); for (int i = 0; i < fileCount; ++i) { String nextFile = mdis.readUTF(); + Utilities.FILE_OP_LOGGER.trace("Looking at committed file: {}", nextFile); if (!committed.add(nextFile)) { throw new HiveException(nextFile + " was specified in multiple manifests"); } @@ -4318,7 +4319,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } for (Path path : mmDirectories) { - cleanMmDirectory(path, fs, unionSuffix, committed); + cleanMmDirectory(path, fs, unionSuffix, lbLevels, committed); } if (!committed.isEmpty()) { @@ -4356,10 +4357,28 @@ public PathOnlyFileStatus(Path path) { } } - private static void cleanMmDirectory(Path dir, FileSystem fs, - String unionSuffix, HashSet committed) throws IOException, HiveException { + private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix, + int lbLevels, HashSet committed) throws IOException, HiveException { for (FileStatus child : fs.listStatus(dir)) { Path childPath = child.getPath(); + if (lbLevels > 0) { + // We need to recurse into some LB directories. We don't check the directories themselves + // for matches; if they are empty they don't matter, and we do will delete bad files. + // This recursion is not the most efficient way to do this but LB is rarely used. + if (child.isDirectory()) { + Utilities.FILE_OP_LOGGER.trace( + "Recursion into LB directory {}; levels remaining ", childPath, lbLevels - 1); + cleanMmDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed); + } else { + if (committed.contains(childPath.toString())) { + throw new HiveException("LB FSOP has commited " + + childPath + " outside of LB directory levels " + lbLevels); + } + deleteUncommitedFile(childPath, fs); + } + continue; + } + // No more LB directories expected. if (unionSuffix == null) { if (committed.remove(childPath.toString())) { continue; // A good file. @@ -4368,15 +4387,21 @@ private static void cleanMmDirectory(Path dir, FileSystem fs, } else if (!child.isDirectory()) { if (committed.contains(childPath.toString())) { throw new HiveException("Union FSOP has commited " - + childPath + " outside of union directory" + unionSuffix); + + childPath + " outside of union directory " + unionSuffix); } deleteUncommitedFile(childPath, fs); } else if (childPath.getName().equals(unionSuffix)) { // Found the right union directory; treat it as "our" MM directory. - cleanMmDirectory(childPath, fs, null, committed); + cleanMmDirectory(childPath, fs, null, 0, committed); } else { - Utilities.FILE_OP_LOGGER.trace("FSOP for {} is ignoring the other side of the union {}", - unionSuffix, childPath); + String childName = childPath.getName(); + if (!childName.startsWith(AbstractFileMergeOperator.UNION_SUDBIR_PREFIX) + && !childName.startsWith(".") && !childName.startsWith("_")) { + throw new HiveException("Union FSOP has an unknown directory " + + childPath + " outside of union directory " + unionSuffix); + } + Utilities.FILE_OP_LOGGER.trace( + "FSOP for {} is ignoring the other side of the union {}", unionSuffix, childPath); } } } @@ -4395,13 +4420,12 @@ private static void deleteUncommitedFile(Path childPath, FileSystem fs) * if the entire directory is valid (has no uncommitted/temporary files). */ public static List getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf, - ValidWriteIdList validWriteIdList, int lbLevels) throws IOException { + ValidWriteIdList validWriteIdList) throws IOException { Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", path); // NULL means this directory is entirely valid. List result = null; FileSystem fs = path.getFileSystem(conf); - FileStatus[] children = (lbLevels == 0) ? fs.listStatus(path) - : fs.globStatus(new Path(path, StringUtils.repeat("*" + Path.SEPARATOR, lbLevels) + "*")); + FileStatus[] children = fs.listStatus(path); for (int i = 0; i < children.length; ++i) { FileStatus file = children[i]; Path childPath = file.getPath(); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4661881301..f6608eb827 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2074,12 +2074,9 @@ private void constructOneLBLocationMap(FileStatus fSta, Utilities.FILE_OP_LOGGER.trace( "Looking for dynamic partitions in {} ({} levels)", loadPath, numDP); Path[] leafStatus = Utilities.getMmDirectoryCandidates( - fs, loadPath, numDP, numLB, null, writeId, -1, conf, isInsertOverwrite); + fs, loadPath, numDP, null, writeId, -1, conf, isInsertOverwrite); for (Path p : leafStatus) { Path dpPath = p.getParent(); // Skip the MM directory that we have found. - for (int i = 0; i < numLB; ++i) { - dpPath = dpPath.getParent(); // Now skip the LB directories, if any... - } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Found DP " + dpPath); } diff --git ql/src/test/queries/clientpositive/mm_all.q ql/src/test/queries/clientpositive/mm_all.q index 4ffbb6b98a..ceef0c072d 100644 --- ql/src/test/queries/clientpositive/mm_all.q +++ ql/src/test/queries/clientpositive/mm_all.q @@ -9,7 +9,6 @@ set hive.exec.dynamic.partition.mode=nonstrict; set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; - -- Force multiple writers when reading drop table intermediate; create table intermediate(key int) partitioned by (p int) stored as orc; @@ -119,6 +118,8 @@ drop table partunion_mm; +set mapreduce.input.fileinputformat.input.dir.recursive=true; + create table skew_mm(k1 int, k2 int, k4 int) skewed by (k1, k4) on ((0,0),(1,1),(2,2),(3,3)) stored as directories tblproperties ("transactional"="true", "transactional_properties"="insert_only");