diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/VerifyContentSummaryCacheHook.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/VerifyContentSummaryCacheHook.java index cdb9d03..c340019 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/VerifyContentSummaryCacheHook.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/hooks/VerifyContentSummaryCacheHook.java @@ -22,20 +22,21 @@ import junit.framework.Assert; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; public class VerifyContentSummaryCacheHook implements ExecuteWithHookContext { public void run(HookContext hookContext) { - Map inputToCS = hookContext.getInputPathToContentSummary(); + Map inputToCS = hookContext.getInputPathToContentSummary(); if (hookContext.getHookType().equals(HookType.PRE_EXEC_HOOK)) { Assert.assertEquals(0, inputToCS.size()); } else { Assert.assertEquals(1, inputToCS.size()); String tmp_dir = System.getProperty("test.tmp.dir"); - for (String key : inputToCS.keySet()) { - if (!key.equals(tmp_dir + "/VerifyContentSummaryCacheHook") && - !key.equals("pfile:" + tmp_dir + "/VerifyContentSummaryCacheHook")) { + for (Path key : inputToCS.keySet()) { + if (!key.equals(new Path(tmp_dir + "/VerifyContentSummaryCacheHook")) && + !key.equals(new Path("pfile:" + tmp_dir + "/VerifyContentSummaryCacheHook"))) { Assert.fail("VerifyContentSummaryCacheHook fails the input path check"); } } diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java index 9054e0e..6fd986f 100644 --- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java +++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java @@ -16,6 +16,7 @@ import org.apache.thrift.EncodingUtils; import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer; import org.apache.thrift.server.AbstractNonblockingServer.*; import java.util.List; import java.util.ArrayList; @@ -12588,7 +12589,7 @@ public get_databases_args getEmptyArgsInstance() { return new get_databases_args(); } - public AsyncMethodCallback> getResultHandler(final AsyncFrameBuffer fb, final int seqid) { + public AsyncMethodCallback> getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; return new AsyncMethodCallback>() { public void onComplete(List o) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index a92331a..5c69dba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -66,8 +66,7 @@ private int resDirFilesNum; boolean initialized; String originalTracker = null; - private final Map pathToCS = new ConcurrentHashMap(); - + private final Map pathToCS = new ConcurrentHashMap(); // scratch path to use for all non-local (ie. hdfs) file system tmp folders private final Path nonLocalScratchPath; @@ -639,7 +638,7 @@ public void restoreOriginalTracker() { } } - public void addCS(String path, ContentSummary cs) { + public void addCS(Path path, ContentSummary cs) { pathToCS.put(path, cs); } @@ -651,7 +650,7 @@ public ContentSummary getCS(String path) { return pathToCS.get(path); } - public Map getPathToCS() { + public Map getPathToCS() { return pathToCS; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 3d8ca92..3d211bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -584,11 +584,11 @@ private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc) // merge work only needs input and output. MergeFileWork mergeWork = new MergeFileWork(mergeFilesDesc.getInputDir(), mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass().getName()); - LinkedHashMap> pathToAliases = - new LinkedHashMap>(); + LinkedHashMap> pathToAliases = + new LinkedHashMap>(); ArrayList inputDirstr = new ArrayList(1); inputDirstr.add(mergeFilesDesc.getInputDir().toString()); - pathToAliases.put(mergeFilesDesc.getInputDir().get(0).toString(), inputDirstr); + pathToAliases.put(mergeFilesDesc.getInputDir().get(0), inputDirstr); mergeWork.setPathToAliases(pathToAliases); mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx()); mergeWork.resolveConcatenateMerge(db.getConf()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index afc03ed..21da292 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -97,8 +98,8 @@ protected transient long logEveryNRows = 0; // input path --> {operator --> context} - private final Map, MapOpCtx>> opCtxMap = - new HashMap, MapOpCtx>>(); + private final Map, MapOpCtx>> opCtxMap = + new HashMap, MapOpCtx>>(); // child operator --> object inspector (converted OI if it's needed) private final Map, StructObjectInspector> childrenOpToOI = new HashMap, StructObjectInspector>(); @@ -307,7 +308,7 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx, try { Map oiSettableProperties = new HashMap(); - for (String onefile : conf.getPathToAliases().keySet()) { + for (Path onefile : conf.getPathToAliases().keySet()) { PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile); TableDesc tableDesc = pd.getTableDesc(); Deserializer partDeserializer = pd.getDeserializer(hconf); @@ -381,8 +382,8 @@ public void setChildren(Configuration hconf) throws Exception { Map convertedOI = getConvertedOI(hconf); - for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { - String onefile = entry.getKey(); + for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + Path onefile = entry.getKey(); List aliases = entry.getValue(); PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); @@ -436,11 +437,10 @@ private void initOperatorContext(List> children } } - private String getNominalPath(Path fpath) { - String nominal = null; + private Path getNominalPath(Path fpath) { + Path nominal = null; boolean schemaless = fpath.toUri().getScheme() == null; - for (String onefile : conf.getPathToAliases().keySet()) { - Path onepath = normalizePath(onefile, schemaless); + for (Path onepath : conf.getPathToAliases().keySet()) { Path curfpath = fpath; if(!schemaless && onepath.toUri().getScheme() == null) { curfpath = new Path(fpath.toUri().getPath()); @@ -453,7 +453,7 @@ private String getNominalPath(Path fpath) { if (nominal != null) { throw new IllegalStateException("Ambiguous input path " + fpath); } - nominal = onefile; + nominal = onepath; } if (nominal == null) { throw new IllegalStateException("Invalid input path " + fpath); @@ -498,7 +498,7 @@ public void closeOp(boolean abort) throws HiveException { public void cleanUpInputFileChangedOp() throws HiveException { super.cleanUpInputFileChangedOp(); Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); Map, MapOpCtx> contexts = opCtxMap.get(nominalPath); if (isLogInfoEnabled) { StringBuilder builder = new StringBuilder(); @@ -522,19 +522,6 @@ public void cleanUpInputFileChangedOp() throws HiveException { currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]); } - private Path normalizePath(String onefile, boolean schemaless) { - //creating Path is expensive, so cache the corresponding - //Path object in normalizedPaths - Path path = normalizedPaths.get(onefile); - if (path == null) { - path = new Path(onefile); - if (schemaless && path.toUri().getScheme() != null) { - path = new Path(path.toUri().getPath()); - } - normalizedPaths.put(onefile, path); - } - return path; - } public void process(Writable value) throws HiveException { // A mapper can span multiple files/partitions. @@ -683,7 +670,7 @@ public OperatorType getType() { public void initializeContexts() { Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); Map, MapOpCtx> contexts = opCtxMap.get(nominalPath); currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index c428812..20bb408 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -349,7 +349,7 @@ public int execute(DriverContext driverContext) { List sortCols = null; int numBuckets = -1; Task task = this; - String path = tbd.getSourcePath().toUri().toString(); + Path path = tbd.getSourcePath(); // Find the first ancestor of this MoveTask which is some form of map reduce task // (Either standard, local, or a merge) while (task.getParentTasks() != null && task.getParentTasks().size() == 1) { @@ -385,7 +385,7 @@ public int execute(DriverContext driverContext) { // condition for merging is not met, see GenMRFileSink1. if (task instanceof MoveTask) { if (((MoveTask)task).getWork().getLoadFileWork() != null) { - path = ((MoveTask)task).getWork().getLoadFileWork().getSourcePath().toUri().toString(); + path = ((MoveTask)task).getWork().getLoadFileWork().getSourcePath(); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 69ee076..9921606 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -1082,7 +1083,7 @@ public void cleanUpInputFileChangedOp() throws HiveException { } // called by map operator. propagated recursively to single parented descendants - public void setInputContext(String inputPath, String tableName, String partitionName) { + public void setInputContext(Path inputPath, String tableName, String partitionName) { if (childOperators != null) { for (Operator child : childOperators) { if (child.getNumParent() == 1) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index 465c508..ba5d4be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -319,7 +320,7 @@ void displayBrokenPipeInfo() { private transient String partitionName ; @Override - public void setInputContext(String inputPath, String tableName, String partitionName) { + public void setInputContext(Path inputPath, String tableName, String partitionName) { this.tableName = tableName; this.partitionName = partitionName; super.setInputContext(inputPath, tableName, partitionName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1d8e3b1..fe3a140 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2693,16 +2693,15 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa long[] summary = {0, 0, 0}; - final List pathNeedProcess = new ArrayList(); + final List pathNeedProcess = new ArrayList(); // Since multiple threads could call this method concurrently, locking // this method will avoid number of threads out of control. synchronized (INPUT_SUMMARY_LOCK) { // For each input path, calculate the total size. - for (String path : work.getPathToAliases().keySet()) { - Path p = new Path(path); + for (Path path : work.getPathToAliases().keySet()) { - if (filter != null && !filter.accept(p)) { + if (filter != null && !filter.accept(path)) { continue; } @@ -2720,7 +2719,7 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa } // Process the case when name node call is needed - final Map resultMap = new ConcurrentHashMap(); + final Map resultMap = new ConcurrentHashMap(); ArrayList> results = new ArrayList>(); final ThreadPoolExecutor executor; int maxThreads = ctx.getConf().getInt("mapred.dfsclient.parallelism.max", 0); @@ -2736,9 +2735,9 @@ public static ContentSummary getInputSummary(final Context ctx, MapWork work, Pa HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { @Override public void interrupt() { - for (String path : pathNeedProcess) { + for (Path path : pathNeedProcess) { try { - new Path(path).getFileSystem(ctx.getConf()).close(); + path.getFileSystem(ctx.getConf()).close(); } catch (IOException ignore) { LOG.debug(ignore); } @@ -2751,9 +2750,8 @@ public void interrupt() { try { Configuration conf = ctx.getConf(); JobConf jobConf = new JobConf(conf); - for (String path : pathNeedProcess) { - final Path p = new Path(path); - final String pathStr = path; + for (final Path path : pathNeedProcess) { + // All threads share the same Configuration and JobConf based on the // assumption that they are thread safe if only read operations are // executed. It is not stated in Hadoop's javadoc, the sourcce codes @@ -2763,9 +2761,9 @@ public void interrupt() { final Configuration myConf = conf; final JobConf myJobConf = jobConf; final Map> aliasToWork = work.getAliasToWork(); - final Map> pathToAlias = work.getPathToAliases(); + final Map> pathToAlias = work.getPathToAliases(); final PartitionDesc partDesc = work.getPathToPartitionInfo().get( - p.toString()); + path); Runnable r = new Runnable() { @Override public void run() { @@ -2776,7 +2774,7 @@ public void run() { inputFormatCls, myJobConf); if (inputFormatObj instanceof ContentSummaryInputFormat) { ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj; - resultMap.put(pathStr, cs.getContentSummary(p, myJobConf)); + resultMap.put(path, cs.getContentSummary(path, myJobConf)); return; } HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, @@ -2788,7 +2786,7 @@ public void run() { long total = 0; TableDesc tableDesc = partDesc.getTableDesc(); InputEstimator estimator = (InputEstimator) handler; - for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) { + for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, path)) { JobConf jobConf = new JobConf(myJobConf); TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias); Utilities.setColumnNameList(jobConf, scanOp, true); @@ -2797,19 +2795,19 @@ public void run() { Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); total += estimator.estimate(myJobConf, scanOp, -1).getTotalLength(); } - resultMap.put(pathStr, new ContentSummary(total, -1, -1)); + resultMap.put(path, new ContentSummary(total, -1, -1)); } // todo: should nullify summary for non-native tables, // not to be selected as a mapjoin target - FileSystem fs = p.getFileSystem(myConf); - resultMap.put(pathStr, fs.getContentSummary(p)); + FileSystem fs = path.getFileSystem(myConf); + resultMap.put(path, fs.getContentSummary(path)); } catch (Exception e) { // We safely ignore this exception for summary data. // We don't update the cache to protect it from polluting other // usages. The worst case is that IOException will always be // retried for another getInputSummary(), which is fine as // IOException is not considered as a common case. - LOG.info("Cannot get size of " + pathStr + ". Safely ignored."); + LOG.info("Cannot get size of " + path + ". Safely ignored."); } } }; @@ -2841,7 +2839,7 @@ public void run() { executor.shutdown(); } HiveInterruptUtils.checkInterrupted(); - for (Map.Entry entry : resultMap.entrySet()) { + for (Map.Entry entry : resultMap.entrySet()) { ContentSummary cs = entry.getValue(); summary[0] += cs.getLength(); @@ -3595,11 +3593,10 @@ public static double getHighestSamplePercentage (MapWork work) { // The alias may not have any path Path path = null; - for (String file : new LinkedList(work.getPathToAliases().keySet())) { - List aliases = work.getPathToAliases().get(file); + for (Path pathone : new LinkedList(work.getPathToAliases().keySet())) { + List aliases = work.getPathToAliases().get(pathone); if (aliases.contains(alias)) { - path = new Path(file); - + path = pathone; // Multiple aliases can point to the same path - it should be // processed only once if (pathsProcessed.contains(path)) { @@ -3672,10 +3669,8 @@ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, Map Path hiveScratchDir, String alias, int sequenceNumber) throws Exception { - String strPath = path.toString(); - // The input file does not exist, replace it by a empty file - PartitionDesc partDesc = work.getPathToPartitionInfo().get(strPath); + PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); if (partDesc.getTableDesc().isNonNative()) { // if this isn't a hive table we can't create an empty file for it. return path; @@ -3691,21 +3686,21 @@ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, Map sequenceNumber, props, oneRow); if (LOG.isInfoEnabled()) { - LOG.info("Changed input file " + strPath + " to empty file " + newPath); + LOG.info("Changed input file " + path + " to empty file " + newPath); } // update the work String strNewPath = newPath.toString(); - LinkedHashMap> pathToAliases = work.getPathToAliases(); - pathToAliases.put(strNewPath, pathToAliases.get(strPath)); - pathToAliases.remove(strPath); + LinkedHashMap> pathToAliases = work.getPathToAliases(); + pathToAliases.put(newPath, pathToAliases.get(newPath)); + pathToAliases.remove(path); work.setPathToAliases(pathToAliases); - LinkedHashMap pathToPartitionInfo = work.getPathToPartitionInfo(); - pathToPartitionInfo.put(strNewPath, pathToPartitionInfo.get(strPath)); - pathToPartitionInfo.remove(strPath); + LinkedHashMap pathToPartitionInfo = work.getPathToPartitionInfo(); + pathToPartitionInfo.put(newPath, pathToPartitionInfo.get(newPath)); + pathToPartitionInfo.remove(path); work.setPathToPartitionInfo(pathToPartitionInfo); return newPath; @@ -3734,16 +3729,16 @@ private static Path createDummyFileForEmptyTable(JobConf job, MapWork work, // update the work - LinkedHashMap> pathToAliases = work.getPathToAliases(); + LinkedHashMap> pathToAliases = work.getPathToAliases(); ArrayList newList = new ArrayList(); newList.add(alias); - pathToAliases.put(newPath.toUri().toString(), newList); + pathToAliases.put(newPath, newList); work.setPathToAliases(pathToAliases); - LinkedHashMap pathToPartitionInfo = work.getPathToPartitionInfo(); + LinkedHashMap pathToPartitionInfo = work.getPathToPartitionInfo(); PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone(); - pathToPartitionInfo.put(newPath.toUri().toString(), pDesc); + pathToPartitionInfo.put(newPath, pDesc); work.setPathToPartitionInfo(pathToPartitionInfo); return newPath; @@ -3802,7 +3797,7 @@ public static void setInputAttributes(Configuration conf, MapWork mWork) { public static void createTmpDirs(Configuration conf, MapWork mWork) throws IOException { - Map> pa = mWork.getPathToAliases(); + Map> pa = mWork.getPathToAliases(); if (pa != null) { List> ops = new ArrayList>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 4160399..b10abdc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -493,12 +493,12 @@ private void handleSampling(Context context, MapWork mWork, JobConf job) Operator topOp = mWork.getAliasToWork().get(alias); PartitionDesc partDesc = mWork.getAliasToPartnInfo().get(alias); - ArrayList paths = mWork.getPaths(); + ArrayList paths = mWork.getPaths(); ArrayList parts = mWork.getPartitionDescs(); List inputPaths = new ArrayList(paths.size()); - for (String path : paths) { - inputPaths.add(new Path(path)); + for (Path path : paths) { + inputPaths.add(path); } Path tmpPath = context.getExternalTmpPath(inputPaths.get(0)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java index 52913e0..cf6eb14 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java @@ -197,9 +197,9 @@ private void applyFilterToPartitions( Object[] row = new Object[1]; - Iterator it = work.getPathToPartitionInfo().keySet().iterator(); + Iterator it = work.getPathToPartitionInfo().keySet().iterator(); while (it.hasNext()) { - String p = it.next(); + Path p = it.next(); PartitionDesc desc = work.getPathToPartitionInfo().get(p); Map spec = desc.getPartSpec(); if (spec == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index c3860b3..d79ab73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -144,12 +144,12 @@ public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes"; private void addCredentials(MapWork mapWork, DAG dag) { - Set paths = mapWork.getPathToAliases().keySet(); + Set paths = mapWork.getPathToAliases().keySet(); if (!paths.isEmpty()) { - Iterator pathIterator = Iterators.transform(paths.iterator(), new Function() { + Iterator pathIterator = Iterators.transform(paths.iterator(), new Function() { @Override - public URI apply(String input) { - return new Path(input).toUri(); + public URI apply(Path input) { + return input.toUri(); } }); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java index 0c32bd5..7a72429 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; @@ -285,9 +286,9 @@ private void applyFilterToPartitions(Converter converter, ExprNodeEvaluator eval Object[] row = new Object[1]; - Iterator it = work.getPathToPartitionInfo().keySet().iterator(); + Iterator it = work.getPathToPartitionInfo().keySet().iterator(); while (it.hasNext()) { - String p = it.next(); + Path p = it.next(); PartitionDesc desc = work.getPathToPartitionInfo().get(p); Map spec = desc.getPartSpec(); if (spec == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index c169677..c0b6330 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -58,7 +58,7 @@ // TODO This needs to be looked at. Map of Map to Map... Made concurrent for now since split generation // can happen in parallel. - private static final Map, Map> cache = + private static final Map, Map> cache = new ConcurrentHashMap<>(); private final TezMapredSplitsGrouper tezGrouper = new TezMapredSplitsGrouper(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index efb06b2..0334a35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; @@ -136,7 +137,7 @@ public void init(StructObjectInspector structObjectInspector, String[] scratchCo public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, Configuration hiveConf, FileSplit split, Object[] partitionValues) throws IOException { - Map pathToPartitionInfo = Utilities + Map pathToPartitionInfo = Utilities .getMapWork(hiveConf).getPathToPartitionInfo(); PartitionDesc partDesc = HiveFileFormatUtils diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index bed17e9..49559b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.TaskRunner; @@ -51,7 +52,7 @@ private Index depMap; private UserGroupInformation ugi; private HookType hookType; - final private Map inputPathToContentSummary; + final private Map inputPathToContentSummary; private final String ipAddress; private final String userName; // unique id set for operation when run from HS2, base64 encoded value of @@ -59,7 +60,7 @@ private final String operationId; public HookContext(QueryPlan queryPlan, HiveConf conf, - Map inputPathToContentSummary, String userName, String ipAddress, + Map inputPathToContentSummary, String userName, String ipAddress, String operationId) throws Exception { this.queryPlan = queryPlan; this.conf = conf; @@ -147,7 +148,7 @@ public void setUgi(UserGroupInformation ugi) { this.ugi = ugi; } - public Map getInputPathToContentSummary() { + public Map getInputPathToContentSummary() { return inputPathToContentSummary; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java index edcc3b6..617b986 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java @@ -68,8 +68,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, throw new IOException("cannot find class " + inputFormatClassName); } - pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath() - .toString(), hsplit.getPath().toUri().getPath()); + pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath()); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index e13c4dd..5898993 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -132,7 +132,7 @@ public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobCo private String inputFormatClassName; private CombineFileSplit inputSplitShim; - private Map pathToPartitionInfo; + private Map pathToPartitionInfo; public CombineHiveInputSplit() throws IOException { this(ShimLoader.getHadoopShims().getCombineFileInputFormat() @@ -147,7 +147,7 @@ public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim) this(job, inputSplitShim, null); } public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim, - Map pathToPartitionInfo) throws IOException { + Map pathToPartitionInfo) throws IOException { this.inputSplitShim = inputSplitShim; this.pathToPartitionInfo = pathToPartitionInfo; if (job != null) { @@ -324,10 +324,10 @@ public int hashCode() { * Create Hive splits based on CombineFileSplit. */ private InputSplit[] getCombineSplits(JobConf job, int numSplits, - Map pathToPartitionInfo) + Map pathToPartitionInfo) throws IOException { init(job); - Map> pathToAliases = mrwork.getPathToAliases(); + Map> pathToAliases = mrwork.getPathToAliases(); Map> aliasToWork = mrwork.getAliasToWork(); CombineFileInputFormatShim combine = ShimLoader.getHadoopShims() @@ -573,7 +573,7 @@ public int hashCode() { if (combinablePaths.size() > 0) { FileInputFormat.setInputPaths(job, combinablePaths.toArray (new Path[combinablePaths.size()])); - Map pathToPartitionInfo = this.pathToPartitionInfo != null ? + Map pathToPartitionInfo = this.pathToPartitionInfo != null ? this.pathToPartitionInfo : Utilities.getMapWork(job).getPathToPartitionInfo(); InputSplit[] splits = getCombineSplits(job, numSplits, pathToPartitionInfo); for (InputSplit split : splits) { @@ -618,8 +618,8 @@ private void processPaths(JobConf job, CombineFileInputFormatShim combine, HashMap nameToSamples = mrwork.getNameToSplitSample(); List retLists = new ArrayList(); Map> aliasToSplitList = new HashMap>(); - Map> pathToAliases = mrwork.getPathToAliases(); - Map> pathToAliasesNoScheme = removeScheme(pathToAliases); + Map> pathToAliases = mrwork.getPathToAliases(); + Map> pathToAliasesNoScheme = removeScheme(pathToAliases); // Populate list of exclusive splits for every sampled alias // @@ -688,11 +688,10 @@ private void processPaths(JobConf job, CombineFileInputFormatShim combine, return retLists; } - Map> removeScheme(Map> pathToAliases) { - Map> result = new HashMap>(); - for (Map.Entry > entry : pathToAliases.entrySet()) { - String newKey = new Path(entry.getKey()).toUri().getPath(); - result.put(newKey, entry.getValue()); + Map> removeScheme(Map> pathToAliases) { + Map> result = new HashMap>(); + for (Map.Entry > entry : pathToAliases.entrySet()) { + result.put(entry.getKey(), entry.getValue()); } return result; } @@ -720,8 +719,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, } pushProjectionsAndFilters(job, inputFormatClass, - hsplit.getPath(0).toString(), - hsplit.getPath(0).toUri().getPath()); + hsplit.getPath(0)); return ShimLoader.getHadoopShims().getCombineFileInputFormat() .getRecordReader(job, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index 9b3f8ec..5894d17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -66,7 +66,7 @@ private boolean wasUsingSortedSearch = false; private String genericUDFClassName = null; private final List stopComparisons = new ArrayList(); - private Map pathToPartitionInfo; + private Map pathToPartitionInfo; protected RecordReader recordReader; protected JobConf jobConf; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index f6a0cb9..7dcb223 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -256,7 +256,7 @@ public static RecordWriter getHiveRecordWriter(JobConf jc, jc_output = new JobConf(jc); String codecStr = conf.getCompressCodec(); if (codecStr != null && !codecStr.trim().equals("")) { - Class codec = + Class codec = (Class) JavaUtils.loadClass(codecStr); FileOutputFormat.setOutputCompressorClass(jc_output, codec); } @@ -349,16 +349,16 @@ private static RecordUpdater getRecordUpdater(JobConf jc, } public static PartitionDesc getPartitionDescFromPathRecursively( - Map pathToPartitionInfo, Path dir, - Map, Map> cacheMap) + Map pathToPartitionInfo, Path dir, + Map, Map> cacheMap) throws IOException { return getPartitionDescFromPathRecursively(pathToPartitionInfo, dir, cacheMap, false); } public static PartitionDesc getPartitionDescFromPathRecursively( - Map pathToPartitionInfo, Path dir, - Map, Map> cacheMap, + Map pathToPartitionInfo, Path dir, + Map, Map> cacheMap, boolean ignoreSchema) throws IOException { PartitionDesc part = doGetPartitionDescFromPath(pathToPartitionInfo, dir); @@ -372,13 +372,13 @@ public static PartitionDesc getPartitionDescFromPathRecursively( ) { - Map newPathToPartitionInfo = null; + Map newPathToPartitionInfo = null; if (cacheMap != null) { newPathToPartitionInfo = cacheMap.get(pathToPartitionInfo); } if (newPathToPartitionInfo == null) { // still null - newPathToPartitionInfo = new HashMap(); + newPathToPartitionInfo = new HashMap(); populateNewPartitionDesc(pathToPartitionInfo, newPathToPartitionInfo); if (cacheMap != null) { @@ -395,10 +395,10 @@ public static PartitionDesc getPartitionDescFromPathRecursively( } } - private static boolean pathsContainNoScheme(Map pathToPartitionInfo) { + private static boolean pathsContainNoScheme(Map pathToPartitionInfo) { - for( Entry pe : pathToPartitionInfo.entrySet()){ - if(new Path(pe.getKey()).toUri().getScheme() != null){ + for( Entry pe : pathToPartitionInfo.entrySet()){ + if(pe.getKey().toUri().getScheme() != null){ return false; } } @@ -407,39 +407,35 @@ private static boolean pathsContainNoScheme(Map pathToPar } private static void populateNewPartitionDesc( - Map pathToPartitionInfo, - Map newPathToPartitionInfo) { - for (Map.Entry entry: pathToPartitionInfo.entrySet()) { - String entryKey = entry.getKey(); - PartitionDesc partDesc = entry.getValue(); - Path newP = new Path(entryKey); - String pathOnly = newP.toUri().getPath(); - newPathToPartitionInfo.put(pathOnly, partDesc); + Map pathToPartitionInfo, + Map newPathToPartitionInfo) { + for (Map.Entry entry: pathToPartitionInfo.entrySet()) { + newPathToPartitionInfo.put(new Path(entry.getKey().toUri().getPath()), entry.getValue()); } } private static PartitionDesc doGetPartitionDescFromPath( - Map pathToPartitionInfo, Path dir) { + Map pathToPartitionInfo, Path dir) { // We first do exact match, and then do prefix matching. The latter is due to input dir // could be /dir/ds='2001-02-21'/part-03 where part-03 is not part of partition - String dirPath = dir.toUri().getPath(); - PartitionDesc part = pathToPartitionInfo.get(dir.toString()); + + PartitionDesc part = pathToPartitionInfo.get(dir); if (part == null) { // LOG.warn("exact match not found, try ripping input path's theme and authority"); - part = pathToPartitionInfo.get(dirPath); + part = pathToPartitionInfo.get(new Path(dir.toUri().getPath())); } if (part == null) { - Path curPath = new Path(dir.toUri().getPath()).getParent(); + Path curPath = dir.getParent(); dir = dir.getParent(); while (dir != null) { // first try full match - part = pathToPartitionInfo.get(dir.toString()); + part = pathToPartitionInfo.get(dir); if (part == null) { // exact match not found, try ripping input path's scheme and authority - part = pathToPartitionInfo.get(curPath.toString()); + part = pathToPartitionInfo.get(curPath); } if (part != null) { break; @@ -451,8 +447,8 @@ private static PartitionDesc doGetPartitionDescFromPath( return part; } - private static boolean foundAlias(Map> pathToAliases, - String path) { + private static boolean foundAlias(Map> pathToAliases, + Path path) { List aliases = pathToAliases.get(path); if ((aliases == null) || (aliases.isEmpty())) { return false; @@ -460,43 +456,34 @@ private static boolean foundAlias(Map> pathToAliases, return true; } - private static String getMatchingPath(Map> pathToAliases, + + private static Path getMatchingPath(Map> pathToAliases, Path dir) { - // First find the path to be searched - String path = dir.toString(); - if (foundAlias(pathToAliases, path)) { - return path; - } + // First find the path to be searched - String dirPath = dir.toUri().getPath(); - if(Shell.WINDOWS){ - //temp hack - //do this to get rid of "/" before the drive letter in windows - dirPath = new Path(dirPath).toString(); - } - if (foundAlias(pathToAliases, dirPath)) { - return dirPath; - } - path = dirPath; - - String dirStr = dir.toString(); - int dirPathIndex = dirPath.lastIndexOf(Path.SEPARATOR); - int dirStrIndex = dirStr.lastIndexOf(Path.SEPARATOR); - while (dirPathIndex >= 0 && dirStrIndex >= 0) { - dirStr = dirStr.substring(0, dirStrIndex); - dirPath = dirPath.substring(0, dirPathIndex); - //first try full match - if (foundAlias(pathToAliases, dirStr)) { - return dirStr; - } - if (foundAlias(pathToAliases, dirPath)) { - return dirPath; - } - dirPathIndex = dirPath.lastIndexOf(Path.SEPARATOR); - dirStrIndex = dirStr.lastIndexOf(Path.SEPARATOR); + if (foundAlias(pathToAliases, dir)) { + return dir; + } + + Path path = new Path(dir.toUri().getPath()); + + if (foundAlias(pathToAliases, path)) { + return path; + } + + while (dir != null && path != null) { + dir = dir.getParent(); + path = path.getParent(); + //first try full match + if (foundAlias(pathToAliases, dir)) { + return dir; + } + if (foundAlias(pathToAliases, path)) { + return path; + } + } + return null; } - return null; - } /** * Get the list of operators from the operator tree that are needed for the path @@ -505,7 +492,7 @@ private static String getMatchingPath(Map> pathToAlias * @param dir The path to look for **/ public static List> doGetWorksFromPath( - Map> pathToAliases, + Map> pathToAliases, Map> aliasToWork, Path dir) { List> opList = new ArrayList>(); @@ -523,12 +510,12 @@ private static String getMatchingPath(Map> pathToAlias * @param dir The path to look for **/ public static List doGetAliasesFromPath( - Map> pathToAliases, + Map> pathToAliases, Path dir) { if (pathToAliases == null) { return new ArrayList(); } - String path = getMatchingPath(pathToAliases, dir); + Path path = getMatchingPath(pathToAliases, dir); return pathToAliases.get(path); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 05239e6..082af94 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -86,7 +86,7 @@ private JobConf job; // both classes access by subclasses - protected Map pathToPartitionInfo; + protected Map pathToPartitionInfo; protected MapWork mrwork; /** @@ -237,14 +237,13 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, } boolean nonNative = false; - PartitionDesc part = pathToPartitionInfo.get(hsplit.getPath().toString()); + PartitionDesc part = pathToPartitionInfo.get(hsplit.getPath()); if ((part != null) && (part.getTableDesc() != null)) { Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), job); nonNative = part.getTableDesc().isNonNative(); } - pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath() - .toString(), hsplit.getPath().toUri().getPath(), nonNative); + pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(), nonNative); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); RecordReader innerReader = null; @@ -326,11 +325,11 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job Path[] getInputPaths(JobConf job) throws IOException { Path[] dirs; if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { - Set pathStrings = mrwork.getPathToPartitionInfo().keySet(); + Set pathStrings = mrwork.getPathToPartitionInfo().keySet(); dirs = new Path[pathStrings.size()]; - Iterator it = pathStrings.iterator(); + Iterator it = pathStrings.iterator(); for (int i = 0; i < dirs.length; i++) { - dirs[i] = new Path(it.next()); + dirs[i] = it.next(); } } else { dirs = FileInputFormat.getInputPaths(job); @@ -377,7 +376,7 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job TableDesc table = part.getTableDesc(); TableScanOperator tableScan = null; - List aliases = mrwork.getPathToAliases().get(dir.toString()); + List aliases = mrwork.getPathToAliases().get(dir); // Make filter pushdown information available to getSplits. if ((aliases != null) && (aliases.size() == 1)) { @@ -472,11 +471,11 @@ private void pushProjection(final JobConf newjob, final StringBuilder readColumn } protected static PartitionDesc getPartitionDescFromPath( - Map pathToPartitionInfo, Path dir) + Map pathToPartitionInfo, Path dir) throws IOException { - PartitionDesc partDesc = pathToPartitionInfo.get(dir.toString()); + PartitionDesc partDesc = pathToPartitionInfo.get(dir); if (partDesc == null) { - partDesc = pathToPartitionInfo.get(dir.toUri().getPath()); + partDesc = pathToPartitionInfo.get(new Path(dir.toUri().getPath())); } if (partDesc == null) { throw new IOException("cannot find dir = " + dir.toString() @@ -532,13 +531,12 @@ public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { } protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass, - String splitPath, String splitPathWithNoSchema) { - pushProjectionsAndFilters(jobConf, inputFormatClass, splitPath, - splitPathWithNoSchema, false); + Path splitPath) { + pushProjectionsAndFilters(jobConf, inputFormatClass, splitPath, false); } protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass, - String splitPath, String splitPathWithNoSchema, boolean nonNative) { + Path splitPath, boolean nonNative) { if (this.mrwork == null) { init(job); } @@ -548,27 +546,25 @@ protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass } ArrayList aliases = new ArrayList(); - Iterator>> iterator = this.mrwork + Iterator>> iterator = this.mrwork .getPathToAliases().entrySet().iterator(); + boolean match; + + Path splitPathWithNoSchema = new Path(splitPath.toUri().getPath()); while (iterator.hasNext()) { - Entry> entry = iterator.next(); - String key = entry.getKey(); - boolean match; - if (nonNative) { - // For non-native tables, we need to do an exact match to avoid - // HIVE-1903. (The table location contains no files, and the string - // representation of its path does not have a trailing slash.) - match = - splitPath.equals(key) || splitPathWithNoSchema.equals(key); - } else { - // But for native tables, we need to do a prefix match for - // subdirectories. (Unlike non-native tables, prefix mixups don't seem - // to be a potential problem here since we are always dealing with the - // path to something deeper than the table location.) - match = - splitPath.startsWith(key) || splitPathWithNoSchema.startsWith(key); - } + Entry> entry = iterator.next(); + Path key = entry.getKey(); + match = false; + while (key!=null){ + if((splitPath.equals(key))||(splitPathWithNoSchema.equals(key))){ + match = true; + break; + } + key = key.getParent(); + } + + if (match) { ArrayList list = entry.getValue(); for (String val : list) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOPrepareCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOPrepareCache.java index f4a21f8..b6a7866 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOPrepareCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOPrepareCache.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.plan.PartitionDesc; /** @@ -48,21 +49,21 @@ public void clear() { } } - private Map, Map> partitionDescMap; + private Map, Map> partitionDescMap; - public Map, Map> allocatePartitionDescMap() { + public Map, Map> allocatePartitionDescMap() { if (partitionDescMap == null) { - partitionDescMap = new HashMap, Map>(); + partitionDescMap = new HashMap, Map>(); } return partitionDescMap; } - public Map, Map> getPartitionDescMap() { + public Map, Map> getPartitionDescMap() { return partitionDescMap; } public void setPartitionDescMap( - Map, Map> partitionDescMap) { + Map, Map> partitionDescMap) { this.partitionDescMap = partitionDescMap; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java index feef854..3d2fa11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java @@ -38,28 +38,26 @@ public class SymbolicInputFormat implements ReworkMapredInputFormat { public void rework(HiveConf job, MapredWork work) throws IOException { - Map pathToParts = work.getMapWork().getPathToPartitionInfo(); - List toRemovePaths = new ArrayList(); - Map toAddPathToPart = new HashMap(); - Map> pathToAliases = work.getMapWork().getPathToAliases(); + Map pathToParts = work.getMapWork().getPathToPartitionInfo(); + List toRemovePaths = new ArrayList(); + Map toAddPathToPart = new HashMap(); + Map> pathToAliases = work.getMapWork().getPathToAliases(); - for (Map.Entry pathPartEntry : pathToParts - .entrySet()) { - String path = pathPartEntry.getKey(); + for (Map.Entry pathPartEntry : pathToParts.entrySet()) { + Path path = pathPartEntry.getKey(); PartitionDesc partDesc = pathPartEntry.getValue(); // this path points to a symlink path if (partDesc.getInputFileFormatClass().equals( SymlinkTextInputFormat.class)) { // change to TextInputFormat partDesc.setInputFileFormatClass(TextInputFormat.class); - Path symlinkDir = new Path(path); - FileSystem fileSystem = symlinkDir.getFileSystem(job); - FileStatus fStatus = fileSystem.getFileStatus(symlinkDir); + FileSystem fileSystem = path.getFileSystem(job); + FileStatus fStatus = fileSystem.getFileStatus(path); FileStatus[] symlinks = null; if (!fStatus.isDir()) { symlinks = new FileStatus[] { fStatus }; } else { - symlinks = fileSystem.listStatus(symlinkDir, FileUtils.HIDDEN_FILES_PATH_FILTER); + symlinks = fileSystem.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); } toRemovePaths.add(path); ArrayList aliases = pathToAliases.remove(path); @@ -75,8 +73,8 @@ public void rework(HiveConf job, MapredWork work) throws IOException { while ((line = reader.readLine()) != null) { // no check for the line? How to check? // if the line is invalid for any reason, the job will fail. - toAddPathToPart.put(line, partDesc); - pathToAliases.put(line, aliases); + toAddPathToPart.put(new Path(line), partDesc); + pathToAliases.put(new Path(line), aliases); } } finally { org.apache.hadoop.io.IOUtils.closeStream(reader); @@ -86,7 +84,7 @@ public void rework(HiveConf job, MapredWork work) throws IOException { } pathToParts.putAll(toAddPathToPart); - for (String toRemove : toRemovePaths) { + for (Path toRemove : toRemovePaths) { pathToParts.remove(toRemove); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java index 1381514..88826b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java @@ -99,8 +99,8 @@ private Schema getSchema(JobConf job, FileSplit split) throws AvroSerdeException // Iterate over the Path -> Partition descriptions to find the partition // that matches our input split. - for (Map.Entry pathsAndParts: mapWork.getPathToPartitionInfo().entrySet()){ - String partitionPath = pathsAndParts.getKey(); + for (Map.Entry pathsAndParts: mapWork.getPathToPartitionInfo().entrySet()){ + Path partitionPath = pathsAndParts.getKey(); if(pathIsInPartition(split.getPath(), partitionPath)) { if(LOG.isInfoEnabled()) { LOG.info("Matching partition " + partitionPath + @@ -133,14 +133,17 @@ private Schema getSchema(JobConf job, FileSplit split) throws AvroSerdeException return null; } - private boolean pathIsInPartition(Path split, String partitionPath) { - boolean schemeless = split.toUri().getScheme() == null; - if (schemeless) { - String schemelessPartitionPath = new Path(partitionPath).toUri().getPath(); - return split.toString().startsWith(schemelessPartitionPath); - } else { - return split.toString().startsWith(partitionPath); + private static boolean pathIsInPartition(Path split, Path partitionPath) { + if (split.toUri().getScheme() == null){ + partitionPath = new Path(partitionPath.toUri().getPath()); } + while (split!=null){ + if(split.equals(partitionPath)){ + return true; + } + split = split.getParent(); + } + return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java index fbc87e8..f94b212 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java @@ -79,10 +79,10 @@ public MergeFileWork(List inputPaths, Path outputDir, } partDesc.setInputFileFormatClass(internalInputFormat); if (this.getPathToPartitionInfo() == null) { - this.setPathToPartitionInfo(new LinkedHashMap()); + this.setPathToPartitionInfo(new LinkedHashMap()); } for (Path path : this.inputPaths) { - this.getPathToPartitionInfo().put(path.toString(), partDesc); + this.getPathToPartitionInfo().put(path, partDesc); } this.isListBucketingAlterTableConcatenate = false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index 4480600..3d5c78c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -40,8 +40,8 @@ private static final Log LOG = LogFactory.getLog(ProjectionPusher.class); - private final Map pathToPartitionInfo = - new LinkedHashMap(); + private final Map pathToPartitionInfo = + new LinkedHashMap(); /** * MapWork is the Hive object which describes input files, * columns projections, and filters. @@ -58,9 +58,9 @@ private void updateMrWork(final JobConf job) { if (mapWork == null && plan != null && plan.length() > 0) { mapWork = Utilities.getMapWork(job); pathToPartitionInfo.clear(); - for (final Map.Entry entry : mapWork.getPathToPartitionInfo().entrySet()) { + for (final Map.Entry entry : mapWork.getPathToPartitionInfo().entrySet()) { // key contains scheme (such as pfile://) and we want only the path portion fix in HIVE-6366 - pathToPartitionInfo.put(new Path(entry.getKey()).toUri().getPath(), entry.getValue()); + pathToPartitionInfo.put(new Path(entry.getKey().toUri().getPath()), entry.getValue()); } } } @@ -75,12 +75,12 @@ private void pushProjectionsAndFilters(final JobConf jobConf, } final ArrayList aliases = new ArrayList(); - final Iterator>> iterator = mapWork.getPathToAliases().entrySet().iterator(); + final Iterator>> iterator = mapWork.getPathToAliases().entrySet().iterator(); while (iterator.hasNext()) { - final Entry> entry = iterator.next(); - final String key = new Path(entry.getKey()).toUri().getPath(); - if (splitPath.equals(key) || splitPathWithNoSchema.equals(key)) { + final Entry> entry = iterator.next(); + final Path key = entry.getKey(); + if (new Path(splitPath).equals(key) || new Path(splitPathWithNoSchema).equals(key)) { final ArrayList list = entry.getValue(); for (final String val : list) { aliases.add(val); @@ -138,15 +138,15 @@ private void pushFilters(final JobConf jobConf, final TableScanOperator tableSca public JobConf pushProjectionsAndFilters(JobConf jobConf, Path path) - throws IOException { - updateMrWork(jobConf); // TODO: refactor this in HIVE-6366 - final JobConf cloneJobConf = new JobConf(jobConf); - final PartitionDesc part = pathToPartitionInfo.get(path.toString()); - - if ((part != null) && (part.getTableDesc() != null)) { - Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), cloneJobConf); - } - pushProjectionsAndFilters(cloneJobConf, path.toString(), path.toUri().getPath()); - return cloneJobConf; + throws IOException { + updateMrWork(jobConf); // TODO: refactor this in HIVE-6366 + final JobConf cloneJobConf = new JobConf(jobConf); + final PartitionDesc part = pathToPartitionInfo.get(path); + if ((part != null) && (part.getTableDesc() != null)) { + Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), cloneJobConf); + } + pushProjectionsAndFilters(cloneJobConf, path.toString(), path.toUri().getPath()); + return cloneJobConf; } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java index c0a8ae7..020f4d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java @@ -52,10 +52,10 @@ public PartialScanWork(List inputPaths) { PartitionDesc partDesc = new PartitionDesc(); partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class); if(this.getPathToPartitionInfo() == null) { - this.setPathToPartitionInfo(new LinkedHashMap()); + this.setPathToPartitionInfo(new LinkedHashMap()); } for(Path path: this.inputPaths) { - this.getPathToPartitionInfo().put(path.toString(), partDesc); + this.getPathToPartitionInfo().put(path, partDesc); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java index d63aa29..435e630 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java @@ -64,9 +64,9 @@ public ColumnTruncateWork(List droppedColumns, Path inputDir, Path outp PartitionDesc partDesc = new PartitionDesc(); partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class); if(this.getPathToPartitionInfo() == null) { - this.setPathToPartitionInfo(new LinkedHashMap()); + this.setPathToPartitionInfo(new LinkedHashMap()); } - this.getPathToPartitionInfo().put(inputDir.toString(), partDesc); + this.getPathToPartitionInfo().put(inputDir, partDesc); } public Path getInputDir() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 40d0e34..44e39de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -240,11 +240,11 @@ private static void setUnionPlan(GenMRProcContext opProcCtx, String taskTmpDir = taskTmpDirLst.get(pos); TableDesc tt_desc = tt_descLst.get(pos); MapWork mWork = plan.getMapWork(); - if (mWork.getPathToAliases().get(taskTmpDir) == null) { - mWork.getPathToAliases().put(taskTmpDir, + if (mWork.getPathToAliases().get(new Path(taskTmpDir)) == null) { + mWork.getPathToAliases().put(new Path(taskTmpDir), new ArrayList()); - mWork.getPathToAliases().get(taskTmpDir).add(taskTmpDir); - mWork.getPathToPartitionInfo().put(taskTmpDir, + mWork.getPathToAliases().get(new Path(taskTmpDir)).add(taskTmpDir); + mWork.getPathToPartitionInfo().put(new Path(taskTmpDir), new PartitionDesc(tt_desc, null)); mWork.getAliasToWork().put(taskTmpDir, topOperators.get(pos)); } @@ -447,8 +447,6 @@ public static void setTaskPlan(String alias_id, * current alias * @param topOp * the top operator of the stack - * @param plan - * current plan * @param local * whether you need to add to map-reduce or local work * @param opProcCtx @@ -475,8 +473,6 @@ public static void setTaskPlan(String alias_id, * map work to initialize * @param local * whether you need to add to map-reduce or local work - * @param pList - * pruned partition list. If it is null it will be computed on-the-fly. * @param inputs * read entities for the map work * @param conf @@ -713,7 +709,7 @@ public static void setMapWork(MapWork plan, ParseContext parseCtx, Set topOp, MapWork plan, boolean local, TableDesc tt_desc) throws SemanticException { @@ -952,8 +946,8 @@ public static MapredWork getMapRedWorkFromConf(HiveConf conf) { conf.getBoolVar( HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS); work.setMapperCannotSpanPartns(mapperCannotSpanPartns); - work.setPathToAliases(new LinkedHashMap>()); - work.setPathToPartitionInfo(new LinkedHashMap()); + work.setPathToAliases(new LinkedHashMap>()); + work.setPathToPartitionInfo(new LinkedHashMap()); work.setAliasToWork(new LinkedHashMap>()); work.setHadoopSupportsSplittable( conf.getBoolVar(HiveConf.ConfVars.HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE)); @@ -1104,7 +1098,7 @@ private static void splitTasks(ReduceSinkOperator op, } // Add the path to alias mapping - setTaskPlan(taskTmpDir.toUri().toString(), streamDesc, tableScanOp, cplan.getMapWork(), false, tt_desc); + setTaskPlan(taskTmpDir, streamDesc, tableScanOp, cplan.getMapWork(), false, tt_desc); opProcCtx.setCurrTopOp(null); opProcCtx.setCurrAliasId(null); opProcCtx.setCurrTask(childTask); @@ -1132,13 +1126,13 @@ static boolean hasBranchFinished(Object... children) { */ public static void replaceMapWork(String sourceAlias, String targetAlias, MapWork source, MapWork target) { - Map> sourcePathToAliases = source.getPathToAliases(); - Map sourcePathToPartitionInfo = source.getPathToPartitionInfo(); + Map> sourcePathToAliases = source.getPathToAliases(); + Map sourcePathToPartitionInfo = source.getPathToPartitionInfo(); Map> sourceAliasToWork = source.getAliasToWork(); Map sourceAliasToPartnInfo = source.getAliasToPartnInfo(); - Map> targetPathToAliases = target.getPathToAliases(); - Map targetPathToPartitionInfo = target.getPathToPartitionInfo(); + Map> targetPathToAliases = target.getPathToAliases(); + Map targetPathToPartitionInfo = target.getPathToPartitionInfo(); Map> targetAliasToWork = target.getAliasToWork(); Map targetAliasToPartnInfo = target.getAliasToPartnInfo(); @@ -1159,15 +1153,15 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, // Remove unnecessary information from target targetAliasToWork.remove(targetAlias); targetAliasToPartnInfo.remove(targetAlias); - List pathsToRemove = new ArrayList(); - for (Entry> entry: targetPathToAliases.entrySet()) { + List pathsToRemove = new ArrayList(); + for (Entry> entry: targetPathToAliases.entrySet()) { ArrayList aliases = entry.getValue(); aliases.remove(targetAlias); if (aliases.isEmpty()) { pathsToRemove.add(entry.getKey()); } } - for (String pathToRemove: pathsToRemove) { + for (Path pathToRemove: pathsToRemove) { targetPathToAliases.remove(pathToRemove); targetPathToPartitionInfo.remove(pathToRemove); } @@ -1176,14 +1170,14 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, targetAliasToWork.put(sourceAlias, sourceAliasToWork.get(sourceAlias)); targetAliasToPartnInfo.putAll(sourceAliasToPartnInfo); targetPathToPartitionInfo.putAll(sourcePathToPartitionInfo); - List pathsToAdd = new ArrayList(); - for (Entry> entry: sourcePathToAliases.entrySet()) { + List pathsToAdd = new ArrayList(); + for (Entry> entry: sourcePathToAliases.entrySet()) { ArrayList aliases = entry.getValue(); if (aliases.contains(sourceAlias)) { pathsToAdd.add(entry.getKey()); } } - for (String pathToAdd: pathsToAdd) { + for (Path pathToAdd: pathsToAdd) { if (!targetPathToAliases.containsKey(pathToAdd)) { targetPathToAliases.put(pathToAdd, new ArrayList()); } @@ -1193,7 +1187,6 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, /** * @param fsInput The FileSink operator. - * @param ctx The MR processing context. * @param finalName the final destination path the merge job should output. * @param dependencyTask * @param mvTasks @@ -1506,10 +1499,6 @@ public static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp) * the table scan operator that is the root of the MapReduce task. * @param fsDesc * the file sink descriptor that serves as the input to this merge task. - * @param parentMR - * the parent MapReduce work - * @param parentFS - * the last FileSinkOperator in the parent MapReduce work * @return the MapredWork */ private static MapWork createMRWorkForMergingFiles (HiveConf conf, @@ -1523,8 +1512,8 @@ private static MapWork createMRWorkForMergingFiles (HiveConf conf, // constructing the default MapredWork MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf); MapWork cplan = cMrPlan.getMapWork(); - cplan.getPathToAliases().put(inputDir, aliases); - cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null)); + cplan.getPathToAliases().put(new Path(inputDir), aliases); + cplan.getPathToPartitionInfo().put(new Path(inputDir), new PartitionDesc(tblDesc, null)); cplan.getAliasToWork().put(inputDir, topOp); cplan.setMapperCannotSpanPartns(true); @@ -1571,14 +1560,14 @@ public static MapWork createMergeTask(FileSinkDesc fsInputDesc, // create the merge file work MergeFileWork work = new MergeFileWork(inputDirs, finalName, hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName()); - LinkedHashMap> pathToAliases = - new LinkedHashMap>(); - pathToAliases.put(inputDir.toString(), inputDirstr); + LinkedHashMap> pathToAliases = + new LinkedHashMap>(); + pathToAliases.put(inputDir, inputDirstr); work.setMapperCannotSpanPartns(true); work.setPathToAliases(pathToAliases); PartitionDesc pDesc = new PartitionDesc(tblDesc, null); pDesc.setInputFileFormatClass(internalIFClass); - work.getPathToPartitionInfo().put(inputDir.toString(), pDesc); + work.getPathToPartitionInfo().put(inputDir, pDesc); work.setListBucketingCtx(fsInputDesc.getLbCtx()); // create alias to work which contains the merge operator diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index f8f2b7b..72944e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -144,13 +144,13 @@ private static void genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJ smallTableAliasList.add(alias); // get input path and remove this alias from pathToAlias // because this file will be fetched by fetch operator - LinkedHashMap> pathToAliases = newWork.getMapWork().getPathToAliases(); + LinkedHashMap> pathToAliases = newWork.getMapWork().getPathToAliases(); // keep record all the input path for this alias - HashSet pathSet = new HashSet(); - HashSet emptyPath = new HashSet(); - for (Map.Entry> entry2 : pathToAliases.entrySet()) { - String path = entry2.getKey(); + HashSet pathSet = new HashSet(); + HashSet emptyPath = new HashSet(); + for (Map.Entry> entry2 : pathToAliases.entrySet()) { + Path path = entry2.getKey(); ArrayList list = entry2.getValue(); if (list.contains(alias)) { // add to path set @@ -163,7 +163,7 @@ private static void genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJ } } //remove the path, with which no alias associates - for (String path : emptyPath) { + for (Path path : emptyPath) { pathToAliases.remove(path); } @@ -172,15 +172,15 @@ private static void genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJ List partDir = new ArrayList(); List partDesc = new ArrayList(); - for (String tablePath : pathSet) { + for (Path tablePath : pathSet) { PartitionDesc partitionDesc = newWork.getMapWork().getPathToPartitionInfo().get(tablePath); // create fetchwork for non partitioned table if (partitionDesc.getPartSpec() == null || partitionDesc.getPartSpec().size() == 0) { - fetchWork = new FetchWork(new Path(tablePath), partitionDesc.getTableDesc()); + fetchWork = new FetchWork(tablePath, partitionDesc.getTableDesc()); break; } // if table is partitioned,add partDir and partitionDesc - partDir.add(new Path(tablePath)); + partDir.add(tablePath); partDesc.add(partitionDesc); } // create fetchwork for partitioned table @@ -274,11 +274,8 @@ private static void validateMapJoinTypes(Operator op) /** * convert a regular join to a a map-side join. * - * @param opParseCtxMap * @param op * join operator - * @param joinTree - * qb join tree * @param mapJoinPos * position of the source to be read as part of map-reduce framework. All other sources * are cached in memory @@ -408,11 +405,8 @@ private static boolean needValueIndex(int[] valueIndex) { /** * convert a sortmerge join to a a map-side join. * - * @param opParseCtxMap * @param smbJoinOp * join operator - * @param joinTree - * qb join tree * @param bigTablePos * position of the source to be read as part of map-reduce framework. All other sources * are cached in memory diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java index a3a7f42..0f90dc5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java @@ -25,6 +25,7 @@ import java.util.Stack; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.Task; @@ -118,7 +119,7 @@ protected void replaceTask( } public long getTotalKnownInputSize(Context context, MapWork currWork, - Map> pathToAliases, + Map> pathToAliases, HashMap aliasToSize) throws SemanticException { try { // go over all the input paths, and calculate a known total size, known @@ -129,8 +130,8 @@ public long getTotalKnownInputSize(Context context, MapWork currWork, // is chosen as big table, what's the total size of left tables, which // are going to be small tables. long aliasTotalKnownInputSize = 0L; - for (Map.Entry> entry : pathToAliases.entrySet()) { - String path = entry.getKey(); + for (Map.Entry> entry : pathToAliases.entrySet()) { + Path path = entry.getKey(); List aliasList = entry.getValue(); ContentSummary cs = context.getCS(path); if (cs != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java index 296fecb..b0be340 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -42,20 +43,20 @@ private Map, List> bucketedColsByOp; // A mapping from a directory which a FileSinkOperator writes into to the columns by which that // output is bucketed - private Map> bucketedColsByDirectory; + private Map> bucketedColsByDirectory; // A mapping from an operator to the columns by which it's output is sorted private Map, List> sortedColsByOp; // A mapping from a directory which a FileSinkOperator writes into to the columns by which that // output is sorted - private Map> sortedColsByDirectory; + private Map> sortedColsByDirectory; public BucketingSortingCtx(boolean disableBucketing) { this.disableBucketing = disableBucketing; this.bucketedColsByOp = new HashMap, List>(); - this.bucketedColsByDirectory = new HashMap>(); + this.bucketedColsByDirectory = new HashMap>(); this.sortedColsByOp = new HashMap, List>(); - this.sortedColsByDirectory = new HashMap>(); + this.sortedColsByDirectory = new HashMap>(); } @@ -70,12 +71,12 @@ public void setBucketedCols(Operator op, List } } - public Map> getBucketedColsByDirectory() { + public Map> getBucketedColsByDirectory() { return disableBucketing ? null : bucketedColsByDirectory; } - public void setBucketedColsByDirectory(Map> bucketedColsByDirectory) { + public void setBucketedColsByDirectory(Map> bucketedColsByDirectory) { if (!disableBucketing) { this.bucketedColsByDirectory = bucketedColsByDirectory; } @@ -91,12 +92,12 @@ public void setSortedCols(Operator op, List sor this.sortedColsByOp.put(op, sortedCols); } - public Map> getSortedColsByDirectory() { + public Map> getSortedColsByDirectory() { return sortedColsByDirectory; } - public void setSortedColsByDirectory(Map> sortedColsByDirectory) { + public void setSortedColsByDirectory(Map> sortedColsByDirectory) { this.sortedColsByDirectory = sortedColsByDirectory; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java index aa41200..beca82f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java @@ -429,7 +429,6 @@ private static int indexOfColName(List bucketSortCols, /** * This is used to construct new lists of sorted columns where the order of the columns * hasn't changed, only possibly the name - * @param bucketCols - input sorted columns * @param colInfos - List of column infos * @return output sorted columns */ @@ -468,7 +467,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Set the inferred bucket columns for the file this FileSink produces if (bucketCols != null) { List newBucketCols = getNewBucketCols(bucketCols, colInfos); - bctx.getBucketedColsByDirectory().put(fop.getConf().getDirName().toString(), newBucketCols); + bctx.getBucketedColsByDirectory().put(fop.getConf().getDirName(), newBucketCols); bctx.setBucketedCols(fop, newBucketCols); } @@ -477,7 +476,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Set the inferred sort columns for the file this FileSink produces if (sortCols != null) { List newSortCols = getNewSortCols(sortCols, colInfos); - bctx.getSortedColsByDirectory().put(fop.getConf().getDirName().toString(), newSortCols); + bctx.getSortedColsByDirectory().put(fop.getConf().getDirName(), newSortCols); bctx.setSortedCols(fop, newSortCols); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 1f6b5d7..b8bb39d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; @@ -235,7 +236,7 @@ private void mergeMapJoinTaskIntoItsChildMapRedTask(MapRedTask mapJoinTask, Conf } // The mapJoinTaskFileSinkOperator writes to a different directory - String childMRPath = mapJoinTaskFileSinkOperator.getConf().getDirName().toString(); + Path childMRPath = mapJoinTaskFileSinkOperator.getConf().getDirName(); List childMRAliases = childMapWork.getPathToAliases().get(childMRPath); if (childMRAliases == null || childMRAliases.size() != 1) { return; @@ -243,8 +244,8 @@ private void mergeMapJoinTaskIntoItsChildMapRedTask(MapRedTask mapJoinTask, Conf String childMRAlias = childMRAliases.get(0); // Sanity check to make sure there is no alias conflict after merge. - for (Entry> entry : childMapWork.getPathToAliases().entrySet()) { - String path = entry.getKey(); + for (Entry> entry : childMapWork.getPathToAliases().entrySet()) { + Path path = entry.getKey(); List aliases = entry.getValue(); if (path.equals(childMRPath)) { @@ -394,7 +395,7 @@ public static boolean cannotConvert(long aliasKnownSize, // Must be deterministic order map for consistent q-test output across Java versions HashMap, Set> taskToAliases = new LinkedHashMap, Set>(); - HashMap> pathToAliases = currWork.getPathToAliases(); + HashMap> pathToAliases = currWork.getPathToAliases(); Map> aliasToWork = currWork.getAliasToWork(); // get parseCtx for this Join Operator diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index 15f0d70..08472ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -268,12 +268,12 @@ public static void processSkewJoin(JoinOperator joinOp, String alias = src.toString(); aliases.add(alias); Path bigKeyDirPath = bigKeysDirMap.get(src); - newPlan.getPathToAliases().put(bigKeyDirPath.toString(), aliases); + newPlan.getPathToAliases().put(bigKeyDirPath, aliases); newPlan.getAliasToWork().put(alias, tblScan_op); PartitionDesc part = new PartitionDesc(tableDescList.get(src), null); - newPlan.getPathToPartitionInfo().put(bigKeyDirPath.toString(), part); + newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part); newPlan.getAliasToPartnInfo().put(alias, part); Operator reducer = clonePlan.getReduceWork().getReducer(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index f88fd0a..52dd3e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -292,10 +292,10 @@ public static void processSkewJoin(JoinOperator joinOp, Task aliasesAffected, + private void processAlias(MapWork work, Path path, ArrayList aliasesAffected, ArrayList aliases) { // the aliases that are allowed to map to a null scan. ArrayList allowed = new ArrayList(); @@ -109,8 +109,8 @@ private void processAlias(MapWork work, String path, ArrayList aliasesAf PartitionDesc newPartition = changePartitionToMetadataOnly(partDesc); Path fakePath = new Path(physicalContext.getContext().getMRTmpPath() + newPartition.getTableName() + encode(newPartition.getPartSpec())); - work.getPathToPartitionInfo().put(fakePath.getName(), newPartition); - work.getPathToAliases().put(fakePath.getName(), new ArrayList(allowed)); + work.getPathToPartitionInfo().put(fakePath, newPartition); + work.getPathToAliases().put(fakePath, new ArrayList(allowed)); aliasesAffected.removeAll(allowed); if (aliasesAffected.isEmpty()) { work.getPathToAliases().remove(path); @@ -129,14 +129,14 @@ private void processAlias(MapWork work, HashSet tableScans) { tso.getConf().setIsMetadataOnly(true); } // group path alias according to work - LinkedHashMap> candidates = new LinkedHashMap>(); - for (String path : work.getPaths()) { + LinkedHashMap> candidates = new LinkedHashMap>(); + for (Path path : work.getPaths()) { ArrayList aliasesAffected = work.getPathToAliases().get(path); if (aliasesAffected != null && aliasesAffected.size() > 0) { candidates.put(path, aliasesAffected); } } - for (Entry> entry : candidates.entrySet()) { + for (Entry> entry : candidates.entrySet()) { processAlias(work, entry.getKey(), entry.getValue(), aliases); } } @@ -203,4 +203,4 @@ public int compare(MapWork o1, MapWork o2) { } return null; } -} \ No newline at end of file +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index 3b09c2f..daf9a95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -75,9 +75,9 @@ public SortMergeJoinTaskDispatcher(PhysicalContext context) { private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) { // Remove the paths which are not part of aliasToPartitionInfo Map aliasToPartitionInfo = currWork.getAliasToPartnInfo(); - List removePaths = new ArrayList(); + List removePaths = new ArrayList(); - for (Map.Entry> entry : currWork.getPathToAliases().entrySet()) { + for (Map.Entry> entry : currWork.getPathToAliases().entrySet()) { boolean keepPath = false; for (String alias : entry.getValue()) { if (aliasToPartitionInfo.containsKey(alias)) { @@ -93,7 +93,7 @@ private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) { } List removeAliases = new ArrayList(); - for (String removePath : removePaths) { + for (Path removePath : removePaths) { removeAliases.addAll(currWork.getPathToAliases().get(removePath)); currWork.getPathToAliases().remove(removePath); currWork.getPathToPartitionInfo().remove(removePath); @@ -118,10 +118,10 @@ private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) { PartitionDesc partitionInfo = currWork.getAliasToPartnInfo().get(alias); if (fetchWork.getTblDir() != null) { - currWork.mergeAliasedInput(alias, fetchWork.getTblDir().toUri().toString(), partitionInfo); + currWork.mergeAliasedInput(alias, fetchWork.getTblDir(), partitionInfo); } else { for (Path pathDir : fetchWork.getPartDir()) { - currWork.mergeAliasedInput(alias, pathDir.toUri().toString(), partitionInfo); + currWork.mergeAliasedInput(alias, pathDir, partitionInfo); } } } @@ -264,7 +264,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) HashMap, Set> taskToAliases = new LinkedHashMap, Set>(); // Note that pathToAlias will behave as if the original plan was a join plan - HashMap> pathToAliases = currJoinWork.getMapWork().getPathToAliases(); + HashMap> pathToAliases = currJoinWork.getMapWork().getPathToAliases(); // generate a map join task for the big table SMBJoinDesc originalSMBJoinDesc = originalSMBJoinOp.getConf(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 5d010cc..c0c684f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; @@ -523,10 +524,10 @@ private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String al // Validate the input format VectorPartitionConversion partitionConversion = new VectorPartitionConversion(); - LinkedHashMap> pathToAliases = mapWork.getPathToAliases(); - LinkedHashMap pathToPartitionInfo = mapWork.getPathToPartitionInfo(); - for (Entry> entry: pathToAliases.entrySet()) { - String path = entry.getKey(); + LinkedHashMap> pathToAliases = mapWork.getPathToAliases(); + LinkedHashMap pathToPartitionInfo = mapWork.getPathToPartitionInfo(); + for (Entry> entry: pathToAliases.entrySet()) { + Path path = entry.getKey(); List aliases = entry.getValue(); boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1); if (!isPresent) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java index 5990d17..abc9fcf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java @@ -166,7 +166,7 @@ private static void splitTask(SparkTask currentTask, ReduceWork reduceWork, streamDesc = origStreamDesc.concat(String.valueOf(++pos)); } } - GenMapRedUtils.setTaskPlan(taskTmpDir.toUri().toString(), streamDesc, + GenMapRedUtils.setTaskPlan(taskTmpDir, streamDesc, tableScanOp, mapWork, false, tableDesc); // insert the new task between current task and its child @SuppressWarnings("unchecked") diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java index 1da7f85..ff1b0ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java @@ -50,7 +50,7 @@ private static final long serialVersionUID = 1L; private HashMap, Set> taskToAliases; - HashMap> pathToAliases; + HashMap> pathToAliases; HashMap aliasToKnownSize; private Task commonJoinTask; @@ -85,11 +85,11 @@ public void setAliasToKnownSize(HashMap aliasToKnownSize) { this.aliasToKnownSize = aliasToKnownSize; } - public HashMap> getPathToAliases() { + public HashMap> getPathToAliases() { return pathToAliases; } - public void setPathToAliases(HashMap> pathToAliases) { + public void setPathToAliases(HashMap> pathToAliases) { this.pathToAliases = pathToAliases; } @@ -166,7 +166,7 @@ public ConditionalResolverCommonJoin() { Set participants = getParticipants(ctx); Map aliasToKnownSize = ctx.getAliasToKnownSize(); - Map> pathToAliases = ctx.getPathToAliases(); + Map> pathToAliases = ctx.getPathToAliases(); Map, Set> taskToAliases = ctx.getTaskToAliases(); long threshold = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVESMALLTABLESFILESIZE); @@ -212,10 +212,10 @@ protected void resolveUnknownSizes(ConditionalResolverCommonJoinCtx ctx, HiveCon Set aliases = getParticipants(ctx); Map aliasToKnownSize = ctx.getAliasToKnownSize(); - Map> pathToAliases = ctx.getPathToAliases(); + Map> pathToAliases = ctx.getPathToAliases(); - Set unknownPaths = new HashSet(); - for (Map.Entry> entry : pathToAliases.entrySet()) { + Set unknownPaths = new HashSet(); + for (Map.Entry> entry : pathToAliases.entrySet()) { for (String alias : entry.getValue()) { if (aliases.contains(alias) && !aliasToKnownSize.containsKey(alias)) { unknownPaths.add(entry.getKey()); @@ -227,13 +227,12 @@ protected void resolveUnknownSizes(ConditionalResolverCommonJoinCtx ctx, HiveCon Path localTmpDir = ctx.getLocalTmpDir(); // need to compute the input size at runtime, and select the biggest as // the big table. - for (String p : unknownPaths) { + for (Path path : unknownPaths) { // this path is intermediate data - if (p.startsWith(hdfsTmpDir.toString()) || p.startsWith(localTmpDir.toString())) { - Path path = new Path(p); + if (isParent(hdfsTmpDir,path) || isParent(localTmpDir,path)) { FileSystem fs = path.getFileSystem(conf); long fileSize = fs.getContentSummary(path).getLength(); - for (String alias : pathToAliases.get(p)) { + for (String alias : pathToAliases.get(path)) { Long length = aliasToKnownSize.get(alias); if (length == null) { aliasToKnownSize.put(alias, fileSize); @@ -242,4 +241,17 @@ protected void resolveUnknownSizes(ConditionalResolverCommonJoinCtx ctx, HiveCon } } } + + private static boolean isParent(Path parentPath, Path path){ + boolean match = false; + + while (path!=null){ + if(new Path(path.toUri().getPath()).equals(new Path(parentPath.toUri().getPath()))){ + match = true; + break; + } + path = path.getParent(); + } + return match; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 3f07ea7..f18fabb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -238,15 +238,15 @@ private void generateActualTasks(HiveConf conf, List ptpi = work.getPathToPartitionInfo(); + Map ptpi = work.getPathToPartitionInfo(); assert ptpi.size() == 1; - String path = ptpi.keySet().iterator().next(); + Path path = ptpi.keySet().iterator().next(); PartitionDesc partDesc = ptpi.get(path); TableDesc tblDesc = partDesc.getTableDesc(); ptpi.remove(path); // the root path is not useful anymore // cleanup pathToAliases - Map> pta = work.getPathToAliases(); + Map> pta = work.getPathToAliases(); assert pta.size() == 1; path = pta.keySet().iterator().next(); ArrayList aliases = pta.get(path); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 2cb9257..2256349 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -67,9 +67,9 @@ // use LinkedHashMap to make sure the iteration order is // deterministic, to ease testing - private LinkedHashMap> pathToAliases = new LinkedHashMap>(); + private LinkedHashMap> pathToAliases = new LinkedHashMap>(); - private LinkedHashMap pathToPartitionInfo = new LinkedHashMap(); + private LinkedHashMap pathToPartitionInfo = new LinkedHashMap(); private LinkedHashMap> aliasToWork = new LinkedHashMap>(); @@ -80,10 +80,10 @@ // If this map task has a FileSinkOperator, and bucketing/sorting metadata can be // inferred about the data being written by that operator, these are mappings from the directory // that operator writes into to the bucket/sort columns for that data. - private final Map> bucketedColsByDirectory = - new HashMap>(); - private final Map> sortedColsByDirectory = - new HashMap>(); + private final Map> bucketedColsByDirectory = + new HashMap>(); + private final Map> sortedColsByDirectory = + new HashMap>(); private Path tmpHDFSPath; @@ -138,12 +138,12 @@ public MapWork(String name) { } @Explain(displayName = "Path -> Alias", explainLevels = { Level.EXTENDED }) - public LinkedHashMap> getPathToAliases() { + public LinkedHashMap> getPathToAliases() { return pathToAliases; } public void setPathToAliases( - final LinkedHashMap> pathToAliases) { + final LinkedHashMap> pathToAliases) { this.pathToAliases = pathToAliases; } @@ -159,14 +159,14 @@ public void setPathToAliases( * @return */ @Explain(displayName = "Truncated Path -> Alias", explainLevels = { Level.EXTENDED }) - public Map> getTruncatedPathToAliases() { - Map> trunPathToAliases = new LinkedHashMap> getTruncatedPathToAliases() { + Map> trunPathToAliases = new LinkedHashMap>(); - Iterator>> itr = this.pathToAliases.entrySet().iterator(); + Iterator>> itr = this.pathToAliases.entrySet().iterator(); while (itr.hasNext()) { - final Entry> entry = itr.next(); - String origiKey = entry.getKey(); - String newKey = PlanUtils.removePrefixFromWarehouseConfig(origiKey); + final Entry> entry = itr.next(); + Path origiKey = entry.getKey(); + Path newKey = new Path(PlanUtils.removePrefixFromWarehouseConfig(origiKey.toString())); ArrayList value = entry.getValue(); trunPathToAliases.put(newKey, value); } @@ -174,12 +174,12 @@ public void setPathToAliases( } @Explain(displayName = "Path -> Partition", explainLevels = { Level.EXTENDED }) - public LinkedHashMap getPathToPartitionInfo() { + public LinkedHashMap getPathToPartitionInfo() { return pathToPartitionInfo; } public void setPathToPartitionInfo( - final LinkedHashMap pathToPartitionInfo) { + final LinkedHashMap pathToPartitionInfo) { this.pathToPartitionInfo = pathToPartitionInfo; } @@ -190,7 +190,7 @@ public void setPathToPartitionInfo( */ public void deriveExplainAttributes() { if (pathToPartitionInfo != null) { - for (Map.Entry entry : pathToPartitionInfo + for (Map.Entry entry : pathToPartitionInfo .entrySet()) { entry.getValue().deriveBaseFileName(entry.getKey()); } @@ -212,12 +212,11 @@ public void internTable(Interner interner) { } } if (pathToPartitionInfo != null) { - for (PartitionDesc part : pathToPartitionInfo.values()) { - part.intern(interner); + for (PartitionDesc part : pathToPartitionInfo.values()) { + part.intern(interner); + } } } - } - /** * @return the aliasToPartnInfo */ @@ -261,7 +260,7 @@ public void setNumMapTasks(Integer numMapTasks) { } @SuppressWarnings("nls") - public void addMapWork(String path, String alias, Operator work, + public void addMapWork(Path path, String alias, Operator work, PartitionDesc pd) { ArrayList curAliases = pathToAliases.get(path); if (curAliases == null) { @@ -297,8 +296,8 @@ public void setInputFormatSorted(boolean inputFormatSorted) { public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path path, TableDesc tblDesc, ArrayList aliases, PartitionDesc partDesc) { - pathToAliases.put(path.toString(), aliases); - pathToPartitionInfo.put(path.toString(), partDesc); + pathToAliases.put(path, aliases); + pathToPartitionInfo.put(path, partDesc); } /** @@ -342,7 +341,7 @@ public void replaceRoots(Map, Operator> replacementMap) { return opSet; } - public void mergeAliasedInput(String alias, String pathDir, PartitionDesc partitionInfo) { + public void mergeAliasedInput(String alias, Path pathDir, PartitionDesc partitionInfo) { ArrayList aliases = pathToAliases.get(pathDir); if (aliases == null) { aliases = new ArrayList(Arrays.asList(alias)); @@ -441,8 +440,8 @@ public String getIndexIntermediateFile() { return new ArrayList>(aliasToWork.values()); } - public ArrayList getPaths() { - return new ArrayList(pathToAliases.keySet()); + public ArrayList getPaths() { + return new ArrayList(pathToAliases.keySet()); } public ArrayList getPartitionDescs() { @@ -471,12 +470,12 @@ public void mergingInto(MapWork mapWork) { } @Explain(displayName = "Path -> Bucketed Columns", explainLevels = { Level.EXTENDED }) - public Map> getBucketedColsByDirectory() { + public Map> getBucketedColsByDirectory() { return bucketedColsByDirectory; } @Explain(displayName = "Path -> Sorted Columns", explainLevels = { Level.EXTENDED }) - public Map> getSortedColsByDirectory() { + public Map> getSortedColsByDirectory() { return sortedColsByDirectory; } @@ -516,7 +515,7 @@ public void configureJobConf(JobConf job) { public void logPathToAliases() { if (LOG.isDebugEnabled()) { LOG.debug("LOGGING PATH TO ALIASES"); - for (Map.Entry> entry: pathToAliases.entrySet()) { + for (Map.Entry> entry: pathToAliases.entrySet()) { for (String a: entry.getValue()) { LOG.debug("Path: " + entry.getKey() + ", Alias: " + a); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index b032349..fdfaf26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -286,19 +286,18 @@ public PartitionDesc clone() { * @param path * URI to the partition file */ - public void deriveBaseFileName(String path) { + public void deriveBaseFileName(Path path) { PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); if (path == null) { return; } try { - Path p = new Path(path); - baseFileName = p.getName(); + baseFileName = path.getName(); } catch (Exception ex) { // don't really care about the exception. the goal is to capture the // the last component at the minimum - so set to the complete path - baseFileName = path; + baseFileName = path.toUri().toString(); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 151a946..99622b1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -107,7 +107,7 @@ public void tearDown() throws Exception { FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); } } - @Test + @Test public void testInsertOverwrite() throws Exception { runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2); runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index ca59e90..2a0d2bd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -156,7 +156,7 @@ protected void setUp() { } public static void addMapWork(MapredWork mr, Table tbl, String alias, Operator work) { - mr.getMapWork().addMapWork(tbl.getDataLocation().toString(), alias, work, new PartitionDesc( + mr.getMapWork().addMapWork(tbl.getDataLocation(), alias, work, new PartitionDesc( Utilities.getTableDesc(tbl), null)); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index bf122e0..c3d1d14 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -295,17 +295,17 @@ public void testMapOperator() throws Throwable { ArrayList aliases = new ArrayList(); aliases.add("a"); aliases.add("b"); - LinkedHashMap> pathToAliases = - new LinkedHashMap>(); - pathToAliases.put("hdfs:///testDir", aliases); + LinkedHashMap> pathToAliases = + new LinkedHashMap>(); + pathToAliases.put(new Path("hdfs:///testDir"), aliases); // initialize pathToTableInfo // Default: treat the table as a single column "col" TableDesc td = Utilities.defaultTd; PartitionDesc pd = new PartitionDesc(td, null); - LinkedHashMap pathToPartitionInfo = - new LinkedHashMap(); - pathToPartitionInfo.put("hdfs:///testDir", pd); + LinkedHashMap pathToPartitionInfo = + new LinkedHashMap(); + pathToPartitionInfo.put(new Path("hdfs:///testDir"), pd); // initialize aliasToWork CollectDesc cd = new CollectDesc(Integer.valueOf(1)); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java index 1364888..3385e95 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java @@ -63,13 +63,13 @@ public void testPlan() throws Exception { ArrayList aliasList = new ArrayList(); aliasList.add("a"); - LinkedHashMap> pa = new LinkedHashMap>(); - pa.put("/tmp/testfolder", aliasList); + LinkedHashMap> pa = new LinkedHashMap>(); + pa.put(new Path("/tmp/testfolder"), aliasList); TableDesc tblDesc = Utilities.defaultTd; PartitionDesc partDesc = new PartitionDesc(tblDesc, null); - LinkedHashMap pt = new LinkedHashMap(); - pt.put("/tmp/testfolder", partDesc); + LinkedHashMap pt = new LinkedHashMap(); + pt.put(new Path("/tmp/testfolder"), partDesc); LinkedHashMap> ao = new LinkedHashMap>(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 858cca0..b16441a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -138,11 +138,11 @@ public Edge answer(InvocationOnMock invocation) throws Throwable { mws[0].setAliasToWork(map); mws[1].setAliasToWork(map); - LinkedHashMap> pathMap - = new LinkedHashMap>(); + LinkedHashMap> pathMap + = new LinkedHashMap>(); ArrayList aliasList = new ArrayList(); aliasList.add("foo"); - pathMap.put("foo", aliasList); + pathMap.put(new Path("foo"), aliasList); mws[0].setPathToAliases(pathMap); mws[1].setPathToAliases(pathMap); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java index 7a1748c..5d68ba9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java @@ -137,8 +137,8 @@ private void init() throws IOException { TableDesc tblDesc = Utilities.defaultTd; PartitionDesc partDesc = new PartitionDesc(tblDesc, null); - LinkedHashMap pt = new LinkedHashMap(); - pt.put("/tmp/testfolder", partDesc); + LinkedHashMap pt = new LinkedHashMap(); + pt.put(new Path("/tmp/testfolder"), partDesc); MapredWork mrwork = new MapredWork(); mrwork.getMapWork().setPathToPartitionInfo(pt); Utilities.setMapRedWork(conf, mrwork,new Path("/tmp/" + System.getProperty("user.name"), "hive")); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveFileFormatUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveFileFormatUtils.java index efc18ee..f12ceaa 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveFileFormatUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveFileFormatUtils.java @@ -36,16 +36,16 @@ public void testGetPartitionDescFromPathRecursively() throws IOException { PartitionDesc partDesc_5 = new PartitionDesc(); PartitionDesc partDesc_6 = new PartitionDesc(); - Map pathToPartitionInfo = new HashMap(); + Map pathToPartitionInfo = new HashMap(); pathToPartitionInfo.put( - new Path("file:///tbl/par1/part2/part3").toString(), partDesc_3); - pathToPartitionInfo.put(new Path("/tbl/par1/part2/part4").toString(), + new Path("file:///tbl/par1/part2/part3"), partDesc_3); + pathToPartitionInfo.put(new Path("/tbl/par1/part2/part4"), partDesc_4); - pathToPartitionInfo.put(new Path("/tbl/par1/part2/part5/").toString(), + pathToPartitionInfo.put(new Path("/tbl/par1/part2/part5/"), partDesc_5); pathToPartitionInfo.put(new Path("hdfs:///tbl/par1/part2/part6/") - .toString(), partDesc_6); + , partDesc_6); // first group PartitionDesc ret = null; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java index 6f0b9df..abf1347 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java @@ -77,8 +77,8 @@ protected void setUp() throws IOException { TableDesc tblDesc = Utilities.defaultTd; PartitionDesc partDesc = new PartitionDesc(tblDesc, null); - LinkedHashMap pt = new LinkedHashMap(); - pt.put("/tmp/testfolder", partDesc); + LinkedHashMap pt = new LinkedHashMap(); + pt.put(new Path("/tmp/testfolder"), partDesc); MapredWork mrwork = new MapredWork(); mrwork.getMapWork().setPathToPartitionInfo(pt); Utilities.setMapRedWork(job, mrwork,new Path("/tmp/" + System.getProperty("user.name"), "hive")); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index c0fcedc..f9dd40b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -1465,10 +1465,10 @@ JobConf createMockExecutionEnvironment(Path workDir, // clean out previous contents ((MockFileSystem) root.getFileSystem(conf)).clear(); // build partition strings - String[] partPath = new String[partitions]; + Path[] partPath = new Path[partitions]; StringBuilder buffer = new StringBuilder(); for(int p=0; p < partitions; ++p) { - partPath[p] = new Path(root, "p=" + p).toString(); + partPath[p] = new Path(root, "p=" + p); if (p != 0) { buffer.append(','); } @@ -1514,12 +1514,12 @@ JobConf createMockExecutionEnvironment(Path workDir, mapWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx); } mapWork.setUseBucketizedHiveInputFormat(false); - LinkedHashMap> aliasMap = - new LinkedHashMap>(); + LinkedHashMap> aliasMap = + new LinkedHashMap>(); ArrayList aliases = new ArrayList(); aliases.add(tableName); - LinkedHashMap partMap = - new LinkedHashMap(); + LinkedHashMap partMap = + new LinkedHashMap(); for(int p=0; p < partitions; ++p) { aliasMap.put(partPath[p], aliases); LinkedHashMap partSpec = diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java index ef846a6..ee521d6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestConditionalResolverCommonJoin.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import junit.framework.Assert; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.Task; @@ -38,9 +39,9 @@ public void testResolvingDriverAlias() throws Exception { ConditionalResolverCommonJoin resolver = new ConditionalResolverCommonJoin(); - HashMap> pathToAliases = new HashMap>(); - pathToAliases.put("path1", new ArrayList(Arrays.asList("alias1", "alias2"))); - pathToAliases.put("path2", new ArrayList(Arrays.asList("alias3"))); + HashMap> pathToAliases = new HashMap>(); + pathToAliases.put(new Path("path1"), new ArrayList(Arrays.asList("alias1", "alias2"))); + pathToAliases.put(new Path("path2"), new ArrayList(Arrays.asList("alias3"))); HashMap aliasToKnownSize = new HashMap(); aliasToKnownSize.put("alias1", 1024l); @@ -84,4 +85,4 @@ public void testResolvingDriverAlias() throws Exception { resolved = resolver.resolveMapJoinTask(ctx, conf); Assert.assertNull(resolved); } -} \ No newline at end of file +}