diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 6c053a2..4f35511 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -120,7 +120,7 @@ private void loadHashTable() throws HiveException { } String baseDir = null; - String currentInputFile = getExecContext().getCurrentInputFile(); + String currentInputFile = getExecContext().getCurrentInputPath().toString(); LOG.info("******* Load from HashTable File: input : " + currentInputFile); String fileName = getExecContext().getLocalWork().getBucketFileName(currentInputFile); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index faf2f02..37b7fec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -441,7 +441,7 @@ public void closeOp(boolean abort) throws HiveException { // Find context for current input file @Override public void cleanUpInputFileChangedOp() throws HiveException { - Path fpath = normalizePath(getExecContext().getCurrentInputFile()); + Path fpath = getExecContext().getCurrentInputPath(); for (String onefile : conf.getPathToAliases().keySet()) { Path onepath = normalizePath(onefile); @@ -537,7 +537,7 @@ public void process(Writable value) throws HiveException { VirtualColumn vc = vcs.get(i); if (vc.equals(VirtualColumn.FILENAME)) { if (ctx.inputFileChanged()) { - vcValues[i] = new Text(ctx.getCurrentInputFile()); + vcValues[i] = new Text(ctx.getCurrentInputPath().toString()); } } else if (vc.equals(VirtualColumn.BLOCKOFFSET)) { long current = ctx.getIoCxt().getCurrentBlockStart(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 653f40a..e7c9dd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -515,19 +515,19 @@ private boolean processKey(byte alias, ArrayList key) private void setUpFetchContexts(String alias, MergeQueue mergeQueue) throws HiveException { mergeQueue.clearFetchContext(); - String currentInputFile = getExecContext().getCurrentInputFile(); + Path currentInputPath = getExecContext().getCurrentInputPath(); BucketMapJoinContext bucketMatcherCxt = localWork.getBucketMapjoinContext(); Class bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass(); BucketMatcher bucketMatcher = ReflectionUtils.newInstance(bucketMatcherCls, null); - getExecContext().setFileId(bucketMatcherCxt.createFileId(currentInputFile)); + getExecContext().setFileId(bucketMatcherCxt.createFileId(currentInputPath.toString())); LOG.info("set task id: " + getExecContext().getFileId()); bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt .getAliasBucketFileNameMapping()); - List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, + List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputPath.toString(), bucketMatcherCxt.getMapJoinBigTableAlias(), alias); mergeQueue.setupContext(aliasFiles); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index ba02130..eb09d1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -105,9 +105,9 @@ public void cleanUpInputFileChangedOp() throws HiveException { Map bucketNameMapping = (conf != null) ? conf.getBucketFileNameMapping() : null; if ((bucketNameMapping != null) && (!bucketNameMapping.isEmpty())) { - String currentInputFile = getExecContext().getCurrentInputFile(); + Path currentInputPath = getExecContext().getCurrentInputPath(); getExecContext().setFileId(Integer.toString(bucketNameMapping.get( - Utilities.getFileNameFromDirName(currentInputFile)))); + currentInputPath.getName()))); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java index 0191aef..74bc2d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.commons.logging.Log; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -29,14 +30,14 @@ public static final Log l4j = ExecMapper.l4j; - // lastInputFile should be changed by the root of the operator tree ExecMapper.map() + // lastInputPath should be changed by the root of the operator tree ExecMapper.map() // but kept unchanged throughout the operator tree for one row - private String lastInputFile = null; + private Path lastInputPath = null; // currentInputFile will be updated only by inputFileChanged(). If inputFileChanged() - // is not called throughout the opertor tree, currentInputFile won't be used anyways + // is not called throughout the operator tree, currentInputPath won't be used anyways // so it won't be updated. - private String currentInputFile = null; + private Path currentInputPath = null; private boolean inputFileChecked = false; // for SMB join, replaced with number part of task-id , making output file name @@ -80,10 +81,10 @@ public void clear() { */ public boolean inputFileChanged() { if (!inputFileChecked) { - currentInputFile = this.ioCxt.getInputFile(); + currentInputPath = this.ioCxt.getInputPath(); inputFileChecked = true; } - return lastInputFile == null || !lastInputFile.equals(currentInputFile); + return lastInputPath == null || !lastInputPath.equals(currentInputPath); } /** @@ -93,25 +94,25 @@ public boolean inputFileChanged() { */ public void resetRow() { // Update the lastInputFile with the currentInputFile. - lastInputFile = currentInputFile; + lastInputPath = currentInputPath; inputFileChecked = false; } - public String getLastInputFile() { - return lastInputFile; + public Path getLastInputPath() { + return lastInputPath; } - public void setLastInputFile(String lastInputFile) { - this.lastInputFile = lastInputFile; + public void setLastInputPath(Path lastInputPath) { + this.lastInputPath = lastInputPath; } - public String getCurrentInputFile() { - currentInputFile = this.ioCxt.getInputFile(); - return currentInputFile; + public Path getCurrentInputPath() { + currentInputPath = this.ioCxt.getInputPath(); + return currentInputPath; } - public void setCurrentInputFile(String currentInputFile) { - this.currentInputFile = currentInputFile; + public void setCurrentInputPath(Path currentInputPath) { + this.currentInputPath = currentInputPath; } public JobConf getJc() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index e50fb3e..dd5cb6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -146,12 +146,12 @@ public IOContext getIOContext() { return IOContext.get(); } - public void initIOContext(long startPos, boolean isBlockPointer, String inputFile) { + public void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) { ioCxtRef = this.getIOContext(); ioCxtRef.currentBlockStart = startPos; ioCxtRef.isBlockPointer = isBlockPointer; - ioCxtRef.inputFile = inputFile; - LOG.info("Processing file " + inputFile); + ioCxtRef.inputPath = inputPath; + LOG.info("Processing file " + inputPath); initDone = true; } @@ -184,7 +184,7 @@ public void initIOContext(FileSplit split, JobConf job, blockStart = in.getPosition(); in.close(); } - this.initIOContext(blockStart, blockPointer, path.makeQualified(fs).toString()); + this.initIOContext(blockStart, blockPointer, path.makeQualified(fs)); this.initIOContextSortedProps(split, recordReader, job); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index 1ca530c..45a49c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.fs.Path; + /** * IOContext basically contains the position information of the current @@ -67,7 +69,7 @@ public static void clear() { UNKNOWN } - String inputFile; + Path inputPath; public IOContext() { this.currentBlockStart = 0; @@ -109,12 +111,12 @@ public void setBlockPointer(boolean isBlockPointer) { this.isBlockPointer = isBlockPointer; } - public String getInputFile() { - return inputFile; + public Path getInputPath() { + return inputPath; } - public void setInputFile(String inputFile) { - this.inputFile = inputFile; + public void setInputPath(Path inputPath) { + this.inputPath = inputPath; } public void setIOExceptions(boolean ioe) {