diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 1a1ffc5..982f978 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -40,8 +40,13 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.S3FileSystem; +import org.apache.hadoop.fs.s3native.NativeS3FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.util.StringUtils; @@ -289,15 +294,73 @@ public class Context { } /** - * Get a path to store tmp data destined for external URI. + * Get a path to store tmp data destined for external URI. If the path is an s3 path, + * delete it only if deleteS3Path is set. * * @param extURI * external URI to which the tmp data has to be eventually moved * @return next available tmp path on the file system corresponding extURI */ - public String getExternalTmpFileURI(URI extURI) { - return getExternalScratchDir(extURI) + Path.SEPARATOR + EXT_PREFIX + - nextPathId(); + public String getExternalTmpFileURI(URI extURI, boolean deleteS3Path) { + Path extPath = new Path(extURI.getScheme(), extURI.getAuthority(), extURI.getPath()); + if (isS3Path(extPath)) { + try { + // For S3 file systems write directly to output dirs + FileSystem fs = extPath.getFileSystem(conf); + if(deleteS3Path && !explain) { + fs.delete(extPath, true); + } + return extURI.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + return getExternalScratchDir(extURI) + Path.SEPARATOR + EXT_PREFIX + nextPathId(); + } + } + + public boolean isS3Path(Path path) { + try { + FileSystem fs = path.getFileSystem(conf); + if (fs instanceof NativeS3FileSystem || fs instanceof S3FileSystem) { + return true; + } else { + return false; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public long getS3CurrentTime(String s3PathStr) { + // Get the current time in s3 by creating an s3 file, writing a byte to it, + // closing the stream, then reading the file's modification time. + Path s3Path = new Path(s3PathStr); + Path emptyFile = new Path(s3Path, "___empty"); + FileSystem fs = null; + + try { + fs = s3Path.getFileSystem(conf); + if (fs instanceof NativeS3FileSystem || fs instanceof S3FileSystem) { + if (fs.isFile(s3Path)) { + s3Path = s3Path.getParent(); + } + FSDataOutputStream ds = fs.create(emptyFile); + ds.write(10); + ds.close(); + return fs.getFileStatus(emptyFile).getModificationTime(); + } else { + throw new RuntimeException("Not an s3 file system"); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + fs.delete(emptyFile, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index c031f40..16d7525 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1166,7 +1166,7 @@ public class DDLTask extends Task implements Serializable { // First create the archive in a tmp dir so that if the job fails, the // bad files don't pollute the filesystem - Path tmpDir = new Path(driverContext.getCtx().getExternalTmpFileURI(originalDir.toUri()), "partlevel"); + Path tmpDir = new Path(driverContext.getCtx().getExternalTmpFileURI(originalDir.toUri(), true), "partlevel"); console.printInfo("Creating " + archiveName + " for " + originalDir.toString()); console.printInfo("in " + tmpDir); @@ -1322,7 +1322,7 @@ public class DDLTask extends Task implements Serializable { Path tmpDir = new Path(driverContext .getCtx() - .getExternalTmpFileURI(originalLocation.toUri())); + .getExternalTmpFileURI(originalLocation.toUri(), true)); FileSystem fs = null; try { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index c9e61a1..7c49e66 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -31,9 +31,12 @@ import java.util.Stack; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.fs.s3.S3FileSystem; +import org.apache.hadoop.fs.s3native.NativeS3FileSystem; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -110,9 +113,13 @@ public class FileSinkOperator extends TerminalOperator implements } public FSPaths(Path specPath) { - tmpPath = Utilities.toTempPath(specPath); + if (!fsSupportsMove()) { + tmpPath = specPath; // Do not use temp paths if s3. + } else { + tmpPath = Utilities.toTempPath(specPath); + } taskOutputTempPath = Utilities.toTaskTempPath(specPath); - outPaths = new Path[numFiles]; + outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; outWriters = new RecordWriter[numFiles]; stat = new Stat(); @@ -191,6 +198,10 @@ public class FileSinkOperator extends TerminalOperator implements private void commit(FileSystem fs) throws HiveException { for (int idx = 0; idx < outPaths.length; ++idx) { + if (outPaths[idx].makeQualified(fs).equals(finalPaths[idx].makeQualified(fs))) { + LOG.info("skipping rename since src=dest:" + outPaths[idx] + "=" + finalPaths[idx]); + return; + } try { if (bDynParts && !fs.exists(finalPaths[idx].getParent())) { fs.mkdirs(finalPaths[idx].getParent()); @@ -435,7 +446,7 @@ public class FileSinkOperator extends TerminalOperator implements bucketMap.put(bucketNum, filesIdx); taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum); } - if (isNativeTable) { + if (isNativeTable && fsSupportsMove()) { fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId); LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); @@ -463,9 +474,17 @@ public class FileSinkOperator extends TerminalOperator implements e.printStackTrace(); throw new HiveException(e); } + + if (!fsSupportsMove()) { + String realTaskId = Utilities.getRealTaskId(taskId); + fsp.finalPaths[filesIdx] = HiveFileFormatUtils.getOutputFormatFinalPath( + fsp.tmpPath, realTaskId, jc, hiveOutputFormat, isCompressed, new Path(fsp.tmpPath, realTaskId)); + fsp.outPaths[filesIdx] = fsp.finalPaths[filesIdx]; + } + LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); - if (isNativeTable) { + if (isNativeTable && fsSupportsMove()) { try { // in recent hadoop versions, use deleteOnExit to clean tmp files. autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit( @@ -489,7 +508,7 @@ public class FileSinkOperator extends TerminalOperator implements assert filesIdx == numFiles; // in recent hadoop versions, use deleteOnExit to clean tmp files. - if (isNativeTable) { + if (isNativeTable && fsSupportsMove()) { autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, fsp.outPaths[0]); } } catch (HiveException e) { @@ -630,6 +649,9 @@ public class FileSinkOperator extends TerminalOperator implements fsp2 = new FSPaths(specPath); fsp2.tmpPath = new Path(fsp2.tmpPath, dpDir); fsp2.taskOutputTempPath = new Path(fsp2.taskOutputTempPath, dpDir); + if (!fsSupportsMove()) { + cleanS3DynDirectory(fsp2.tmpPath); + } createBucketFiles(fsp2); valToPaths.put(dpDir, fsp2); } @@ -717,7 +739,7 @@ public class FileSinkOperator extends TerminalOperator implements if (!abort) { for (FSPaths fsp : valToPaths.values()) { fsp.closeWriters(abort); - if (isNativeTable) { + if (isNativeTable && fsSupportsMove()) { fsp.commit(fs); } } @@ -743,6 +765,52 @@ public class FileSinkOperator extends TerminalOperator implements return new String("FS"); } + public void cleanS3DynDirectory(Path s3Path) { + // If this path exists in s3, delete files that are last modified + // before the dpCtxCreationTime. + try { + FileSystem fs = s3Path.getFileSystem(hconf); + if (fs instanceof NativeS3FileSystem || fs instanceof S3FileSystem) { + if (fs.exists(s3Path)) { + FileStatus[] fss = fs.listStatus(s3Path); + for (FileStatus fStatus : fss) { + if(fStatus.getModificationTime() < dpCtx.getCtxCreationTime() || + fStatus.getAccessTime() < dpCtx.getCtxCreationTime()) { + LOG.info("Delete old s3 path " + fStatus.getPath() + + " before writing into dynamic partition directory"); + fs.delete(fStatus.getPath(), true); + } + } + } + } else { + throw new RuntimeException("When trying to deleting old files from s3, path " + + s3Path + " is given but it doesn't look like an s3 directory"); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public boolean fsSupportsMove() { + return fsSupportsMove(this.hconf); + } + + public boolean fsSupportsMove(Configuration hconf) { + Path specPath = new Path(conf.getDirName()); + FileSystem fs; + try { + fs = specPath.getFileSystem(hconf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (fs instanceof NativeS3FileSystem || fs instanceof S3FileSystem) { + return false; + } else { + return true; + } + } + @Override public void jobClose(Configuration hconf, boolean success, JobCloseFeedBack feedBack) throws HiveException { @@ -750,7 +818,16 @@ public class FileSinkOperator extends TerminalOperator implements if ((conf != null) && isNativeTable) { String specPath = conf.getDirName(); DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); - Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf); + if (fsSupportsMove(hconf)) { + Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf); + } else{ + // checking for empty buckets only + List emptyBuckets = findEmptyBuckets(hconf, specPath, dpCtx); + // create empty buckets if necessary + if (emptyBuckets.size() > 0) { + Utilities.createEmptyBuckets(hconf, emptyBuckets, conf); + } + } } } catch (IOException e) { throw new HiveException(e); @@ -758,6 +835,36 @@ public class FileSinkOperator extends TerminalOperator implements super.jobClose(hconf, success, feedBack); } + private List findEmptyBuckets(Configuration hconf, String specPath, DynamicPartitionCtx dpCtx) throws IOException { + List result = new ArrayList(); + if ((dpCtx != null) && (dpCtx.getNumBuckets() > 0)) { + FileSystem fs = (new Path(specPath)).getFileSystem(hconf); + FileStatus parts[] = Utilities.getFileStatusRecurse(new Path(specPath), dpCtx.getNumDPCols(), fs); + for (FileStatus part : parts) { + LOG.info("Processing part for buckets: " + part.getPath().toUri().toString()); + FileStatus[] items = fs.listStatus(part.getPath()); + if ((items != null) && (items.length > 0)) { + String taskId = Utilities.getTaskIdFromFilename(items[0].getPath().getName()); + Set taskIdsCreated = new HashSet(); + for (FileStatus item : items) { + taskIdsCreated.add(Utilities.getTaskIdFromFilename(item.getPath().getName())); + LOG.info("Existing bucket: " + item.getPath().toUri().toString()); + } + for (int j = 0; j < dpCtx.getNumBuckets(); ++j) { + String expectedTaskId = Utilities.replaceTaskId(taskId, j); + if (!taskIdsCreated.contains(expectedTaskId)) { + // create empty bucket, file name should be derived from taskID2 + String pathToAdd = new Path(part.getPath(), expectedTaskId).toUri().toString(); + LOG.info("Adding path for empty bucket: " + pathToAdd); + result.add(pathToAdd); + } + } + } + } + } + return result; + } + @Override public OperatorType getType() { return OperatorType.FILESINK; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index c6f6755..3eb3389 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -617,6 +617,10 @@ public final class Utilities { } } + public static String getRealTaskId(String taskId) { + return taskId.substring(0, taskId.lastIndexOf("_")); + } + public static HashMap makeMap(Object... olist) { HashMap ret = new HashMap(); for (int i = 0; i < olist.length; i += 2) { @@ -1152,7 +1156,7 @@ public final class Utilities { return (ret); } - private static String replaceTaskId(String taskId, int bucketNum) { + public static String replaceTaskId(String taskId, int bucketNum) { String strBucketNum = String.valueOf(bucketNum); int bucketNumLen = strBucketNum.length(); int taskIdLen = taskId.length(); @@ -1265,7 +1269,7 @@ public final class Utilities { * @throws HiveException * @throws IOException */ - private static void createEmptyBuckets(Configuration hconf, ArrayList paths, + public static void createEmptyBuckets(Configuration hconf, List paths, FileSinkDesc conf) throws HiveException, IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index a57f9cf..c5a14bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -44,6 +44,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.S3FileSystem; +import org.apache.hadoop.fs.s3native.NativeS3FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaException; import org.apache.hadoop.hive.metastore.HiveMetaHook; @@ -1862,6 +1864,11 @@ public class Hive { static protected void copyFiles(Path srcf, Path destf, FileSystem fs) throws HiveException { + if (srcf.makeQualified(fs).equals(destf.makeQualified(fs))) { + LOG.debug("skipping copy since src=dest:" + srcf + "=" + destf); + return; + } + try { // create the destination if it does not exist if (!fs.exists(destf)) { @@ -1926,6 +1933,10 @@ public class Hive { try { FileSystem fs = srcf.getFileSystem(conf); + if (srcf.makeQualified(fs).equals(destf.makeQualified(fs))) { + LOG.debug("skipping replace since src=dest:" + srcf + "=" + destf); + return; // don't replace self + } // check if srcf contains nested sub-directories FileStatus[] srcs; try { @@ -1939,6 +1950,24 @@ public class Hive { } checkPaths(fs, srcs, destf, true); + if (fs instanceof NativeS3FileSystem || fs instanceof S3FileSystem) { + boolean b = fs.delete(destf, true); + LOG.debug("Deleting:"+destf.toString()+",Status:"+b); + + fs.mkdirs(destf); + for (FileStatus src : srcs) { + FileStatus[] items = fs.listStatus(src.getPath()); + for(int j=0; j mergeTask = TaskFactory.get(ddlWork, conf); TableDesc tblDesc = Utilities.getTableDesc(tblObj); - String queryTmpdir = ctx.getExternalTmpFileURI(new URI(tblPartLoc)); + String queryTmpdir = ctx.getExternalTmpFileURI(new URI(tblPartLoc), false); mergeDesc.setOutputDir(queryTmpdir); LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, queryTmpdir, tblDesc, partSpec == null ? new HashMap() : partSpec); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 3001575..49646b6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -272,11 +272,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { private Task loadTable(URI fromURI, Table table) { Path dataPath = new Path(fromURI.toString(), "data"); - String tmpURI = ctx.getExternalTmpFileURI(fromURI); + String tmpURI = ctx.getExternalTmpFileURI(fromURI, false); Task copyTask = TaskFactory.get(new CopyWork(dataPath.toString(), tmpURI, false), conf); LoadTableDesc loadTableWork = new LoadTableDesc(tmpURI.toString(), - ctx.getExternalTmpFileURI(fromURI), + ctx.getExternalTmpFileURI(fromURI, false), Utilities.getTableDesc(table), new TreeMap(), false); Task loadTableTask = TaskFactory.get(new MoveWork(getInputs(), @@ -316,13 +316,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " + partSpecToString(addPartitionDesc.getPartSpec()) + " with source location: " + srcLocation); - String tmpURI = ctx.getExternalTmpFileURI(fromURI); + String tmpURI = ctx.getExternalTmpFileURI(fromURI, false); Task copyTask = TaskFactory.get(new CopyWork(srcLocation, tmpURI, false), conf); Task addPartTask = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc), conf); LoadTableDesc loadTableWork = new LoadTableDesc(tmpURI, - ctx.getExternalTmpFileURI(fromURI), + ctx.getExternalTmpFileURI(fromURI, false), Utilities.getTableDesc(table), addPartitionDesc.getPartSpec(), true); loadTableWork.setInheritTableSpecs(false); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 6f311c6..08bf826 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -227,7 +227,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { // might seem redundant in the case // that the hive warehouse is also located in the local file system - but // that's just a test case. - String copyURIStr = ctx.getExternalTmpFileURI(toURI); + String copyURIStr = ctx.getExternalTmpFileURI(toURI, false); URI copyURI = URI.create(copyURIStr); rTask = TaskFactory.get(new CopyWork(fromURI.toString(), copyURIStr), conf); @@ -235,8 +235,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { } // create final load/move work - - String loadTmpPath = ctx.getExternalTmpFileURI(toURI); + String loadTmpPath = ctx.getExternalTmpFileURI(toURI, true); Map partSpec = ts.getPartSpec(); if (partSpec == null) { partSpec = new LinkedHashMap(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index e1febc3..61176e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1020,7 +1020,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { String location = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); try { fname = ctx.getExternalTmpFileURI( - FileUtils.makeQualified(new Path(location), conf).toUri()); + FileUtils.makeQualified(new Path(location), conf).toUri(), true); } catch (Exception e) { throw new SemanticException(generateErrorMessage(ast, "Error creating temporary folder on: " + location), e); @@ -3717,14 +3717,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } boolean isNonNativeTable = dest_tab.isNonNative(); + boolean deleteS3Path = false; + + if (ctx.isS3Path(dest_path) && dpCtx == null) { + deleteS3Path = true; + } + if (isNonNativeTable) { queryTmpdir = dest_path.toUri().getPath(); } else { - queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri()); + queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri(), deleteS3Path); } if (dpCtx != null) { // set the root of the temporay path where dynamic partition columns will populate dpCtx.setRootPath(queryTmpdir); + if(ctx.isS3Path(dest_path)) { + dpCtx.setCtxCreationTime(ctx.getS3CurrentTime(queryTmpdir)); + } } // this table_desc does not contain the partitioning columns table_desc = Utilities.getTableDesc(dest_tab); @@ -3739,7 +3748,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Create the work for moving the table // NOTE: specify Dynamic partitions in dest_tab for WriteEntity if (!isNonNativeTable) { - ltd = new LoadTableDesc(queryTmpdir, ctx.getExternalTmpFileURI(dest_path.toUri()), + ltd = new LoadTableDesc(queryTmpdir, ctx.getExternalTmpFileURI(dest_path.toUri(), deleteS3Path), table_desc, dpCtx); ltd.setReplace(!qb.getParseInfo().isInsertIntoTable( dest_tab.getTableName())); @@ -3799,7 +3808,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { throw new SemanticException(ErrorMsg.OVERWRITE_ARCHIVED_PART .getMsg(qb.getParseInfo().getDestForClause(dest))); } - queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri()); + queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri(), true); table_desc = Utilities.getTableDesc(dest_tab); // Add sorting/bucketing if needed @@ -3809,7 +3818,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { currentTableId = destTableId; destTableId++; - ltd = new LoadTableDesc(queryTmpdir, ctx.getExternalTmpFileURI(dest_path.toUri()), + ltd = new LoadTableDesc(queryTmpdir, ctx.getExternalTmpFileURI(dest_path.toUri(), true), table_desc, dest_part.getSpec()); ltd.setReplace(!qb.getParseInfo().isInsertIntoTable( dest_tab.getTableName())); @@ -3852,7 +3861,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { try { Path qPath = FileUtils.makeQualified(dest_path, conf); - queryTmpdir = ctx.getExternalTmpFileURI(qPath.toUri()); + queryTmpdir = ctx.getExternalTmpFileURI(qPath.toUri(), true); } catch (Exception e) { throw new SemanticException("Error creating temporary folder on: " + dest_path, e); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 5c09789..fc69a1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan; import java.io.IOException; import java.io.Serializable; +import java.net.URI; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; @@ -129,7 +130,10 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, FileSystem inpFs = dirPath.getFileSystem(conf); DynamicPartitionCtx dpCtx = ctx.getDPCtx(); - if (inpFs.exists(dirPath)) { + MoveWork moveWork = (MoveWork)mvTask.getWork(); + boolean moveBetweenDifferentFileSystem = + isFileInDifferentFS(dirName, moveWork.getLoadFileWork().getTargetDir()); + if (moveBetweenDifferentFileSystem || inpFs.exists(dirPath)) { // For each dynamic partition, check if it needs to be merged. MapredWork work = (MapredWork) mrTask.getWork(); @@ -158,10 +162,19 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, // populate pathToPartitionInfo and pathToAliases w/ DP paths long totalSz = 0; boolean doMerge = false; + if (moveBetweenDifferentFileSystem) { + doMerge = true; + if (status.length == 0) { + // No input file. Create a zero length file to avoid breaking hadoop. + inpFs.createNewFile(new Path(dirPath, "empty")); + status = Utilities.getFileStatusRecurse(dirPath, + dpCtx.getNumDPCols(), inpFs); + } + } // list of paths that don't need to merge but need to move to the dest location List toMove = new ArrayList(); for (int i = 0; i < status.length; ++i) { - long len = getMergeSize(inpFs, status[i].getPath(), avgConditionSize); + long len = getMergeSize(inpFs, status[i].getPath(), avgConditionSize, moveBetweenDifferentFileSystem); if (len >= 0) { doMerge = true; totalSz += len; @@ -227,12 +240,12 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, } } mrTask.addDependentTask(mvTask); - } + } } else { // add the move task resTsks.add(mvTask); } } else { // no dynamic partitions - long totalSz = getMergeSize(inpFs, dirPath, avgConditionSize); + long totalSz = getMergeSize(inpFs, dirPath, avgConditionSize, moveBetweenDifferentFileSystem); if (totalSz >= 0) { // add the merge job setupMapRedWork(conf, work, trgtSize, totalSz); resTsks.add(mrTask); @@ -249,6 +262,22 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, return resTsks; } + private boolean isFileInDifferentFS(String filename1, String filename2) { + URI uri1 = null, uri2 = null; + try { + uri1 = new URI(filename1); + uri2 = new URI(filename2); + + if (uri1 == null || uri2 == null || uri1.getHost() == null || uri2.getHost() == null) { + return true; + } + } catch (Exception e) { + throw new RuntimeException("Error parsing URIs: " + uri1 + ", " + uri2, e); + } + + return !uri1.getHost().equals(uri2.getHost()); + } + private void setupMapRedWork(HiveConf conf, MapredWork work, long targetSize, long totalSize) { if (work.getNumReduceTasks() > 0) { int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); @@ -274,10 +303,10 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, * If return value is 0 that means there are multiple files each of which is an empty file. * This could be true when the table is bucketized and all buckets are empty. */ - private long getMergeSize(FileSystem inpFs, Path dirPath, long avgSize) { + private long getMergeSize(FileSystem inpFs, Path dirPath, long avgSize, boolean moveBetweenDifferentFileSystem) { try { FileStatus[] fStats = inpFs.listStatus(dirPath); - if (fStats.length <= 1) { + if ((!moveBetweenDifferentFileSystem) && (fStats.length <= 1)) { return -1; } long totalSz = 0; @@ -285,7 +314,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, totalSz += fStat.getLen(); } - if (totalSz < avgSize * fStats.length) { + if (moveBetweenDifferentFileSystem || (totalSz < avgSize * fStats.length)) { return totalSz; } else { return -1; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 29cfe9d..57d22d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -49,6 +49,8 @@ public class DynamicPartitionCtx implements Serializable { private String defaultPartName; // default partition name in case of null or empty value private int maxPartsPerNode; // maximum dynamic partitions created per mapper/reducer + private long ctxCreationTime; // When this context is created. + public DynamicPartitionCtx() { } @@ -90,6 +92,7 @@ public class DynamicPartitionCtx implements Serializable { this.dpNames = dp.dpNames; this.defaultPartName = dp.defaultPartName; this.maxPartsPerNode = dp.maxPartsPerNode; + this.ctxCreationTime = dp.ctxCreationTime; } public void mapInputToDP(List fs) { @@ -136,6 +139,14 @@ public class DynamicPartitionCtx implements Serializable { return this.rootPath; } + public void setCtxCreationTime(long ctxCreationTime) { + this.ctxCreationTime = ctxCreationTime; + } + + public long getCtxCreationTime() { + return this.ctxCreationTime; + } + public List getDPColNames() { return this.dpNames; }