Index: hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java =================================================================== --- hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (revision 1553793) +++ hcatalog/core/src/test/java/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (working copy) @@ -377,7 +377,7 @@ org.apache.hadoop.hive.ql.metadata.Table tbl = hive.getTable(database, table); FetchWork work; if (tbl.getPartCols().isEmpty()) { - work = new FetchWork(tbl.getDataLocation().toString(), Utilities.getTableDesc(tbl)); + work = new FetchWork(new Path(tbl.getDataLocation()), Utilities.getTableDesc(tbl)); } else { List partitions = hive.getPartitions(tbl); List partDesc = new ArrayList(); Index: hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java =================================================================== --- hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (revision 1553793) +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (working copy) @@ -387,7 +387,7 @@ work = new FetchWork(partLocs, partDesc, Utilities.getTableDesc(tbl)); work.setLimit(100); } else { - work = new FetchWork(tbl.getDataLocation().toString(), Utilities.getTableDesc(tbl)); + work = new FetchWork(new Path(tbl.getDataLocation()), Utilities.getTableDesc(tbl)); } FetchTask task = new FetchTask(); task.setWork(work); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy) @@ -620,16 +620,15 @@ // find the move task for (Task mvTsk : mvTasks) { MoveWork mvWork = mvTsk.getWork(); - String srcDir = null; + Path srcDir = null; if (mvWork.getLoadFileWork() != null) { - srcDir = mvWork.getLoadFileWork().getSourceDir(); + srcDir = mvWork.getLoadFileWork().getSourcePath(); } else if (mvWork.getLoadTableWork() != null) { - srcDir = mvWork.getLoadTableWork().getSourceDir(); + srcDir = mvWork.getLoadTableWork().getSourcePath(); } - String fsOpDirName = fsOp.getConf().getFinalDirName(); if ((srcDir != null) - && (srcDir.equalsIgnoreCase(fsOpDirName))) { + && (srcDir.equals(new Path(fsOp.getConf().getFinalDirName())))) { return mvTsk; } } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (working copy) @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; @@ -196,7 +197,7 @@ PartitionDesc partitionDesc = newWork.getMapWork().getPathToPartitionInfo().get(tablePath); // create fetchwork for non partitioned table if (partitionDesc.getPartSpec() == null || partitionDesc.getPartSpec().size() == 0) { - fetchWork = new FetchWork(tablePath, partitionDesc.getTableDesc()); + fetchWork = new FetchWork(new Path(tablePath), partitionDesc.getTableDesc()); break; } // if table is partitioned,add partDir and partitionDesc Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (working copy) @@ -232,8 +232,7 @@ inputs.clear(); if (!table.isPartitioned()) { inputs.add(new ReadEntity(table)); - String path = table.getPath().toString(); - FetchWork work = new FetchWork(path, Utilities.getTableDesc(table)); + FetchWork work = new FetchWork(table.getPath(), Utilities.getTableDesc(table)); PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc()); work.setSplitSample(splitSample); return work; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -697,7 +697,7 @@ new FetchWork(FetchWork.convertPathToStringArray(partDir), partDesc, tblDesc)); } else { localPlan.getAliasToFetchWork().put(alias_id, - new FetchWork(tblDir.toString(), tblDesc)); + new FetchWork(tblDir, tblDesc)); } plan.setMapLocalWork(localPlan); } @@ -745,7 +745,7 @@ assert localPlan.getAliasToWork().get(alias) == null; assert localPlan.getAliasToFetchWork().get(alias) == null; localPlan.getAliasToWork().put(alias, topOp); - localPlan.getAliasToFetchWork().put(alias, new FetchWork(alias, tt_desc)); + localPlan.getAliasToFetchWork().put(alias, new FetchWork(new Path(alias), tt_desc)); plan.setMapLocalWork(localPlan); } } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (working copy) @@ -118,7 +118,7 @@ PartitionDesc partitionInfo = currWork.getAliasToPartnInfo().get(alias); if (fetchWork.getTblDir() != null) { - currWork.mergeAliasedInput(alias, fetchWork.getTblDir(), partitionInfo); + currWork.mergeAliasedInput(alias, fetchWork.getTblDir().toUri().toString(), partitionInfo); } else { for (String pathDir : fetchWork.getPartDir()) { currWork.mergeAliasedInput(alias, pathDir, partitionInfo); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (working copy) @@ -308,7 +308,7 @@ localPlan.getAliasToWork().put(small_alias.toString(), tblScan_op2); Path tblDir = new Path(smallTblDirs.get(small_alias)); localPlan.getAliasToFetchWork().put(small_alias.toString(), - new FetchWork(tblDir.toString(), tableDescList.get(small_alias))); + new FetchWork(tblDir, tableDescList.get(small_alias))); } newPlan.setMapLocalWork(localPlan); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java (working copy) @@ -37,7 +37,7 @@ public class FetchWork implements Serializable { private static final long serialVersionUID = 1L; - private String tblDir; + private transient Path tblDir; private TableDesc tblDesc; private ArrayList partDir; @@ -75,11 +75,11 @@ return rowsComputedFromStats; } - public FetchWork(String tblDir, TableDesc tblDesc) { + public FetchWork(Path tblDir, TableDesc tblDesc) { this(tblDir, tblDesc, -1); } - public FetchWork(String tblDir, TableDesc tblDesc, int limit) { + public FetchWork(Path tblDir, TableDesc tblDesc, int limit) { this.tblDir = tblDir; this.tblDesc = tblDesc; this.limit = limit; @@ -124,22 +124,15 @@ /** * @return the tblDir */ - public String getTblDir() { + public Path getTblDir() { return tblDir; } /** - * @return the tblDir - */ - public Path getTblDirPath() { - return new Path(tblDir); - } - - /** * @param tblDir * the tblDir to set */ - public void setTblDir(String tblDir) { + public void setTblDir(Path tblDir) { this.tblDir = tblDir; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java (working copy) @@ -37,11 +37,7 @@ this.sourcePath = sourcePath; } - @Explain(displayName = "source", normalExplain = false) - public String getSourceDir() { - return sourcePath.toString(); - } - + @Explain(displayName = "source", normalExplain = false) public Path getSourcePath() { return sourcePath; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (working copy) @@ -149,7 +149,7 @@ resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); } - FetchWork fetch = new FetchWork(new Path(loadFileDesc.getSourceDir()).toString(), + FetchWork fetch = new FetchWork(loadFileDesc.getSourcePath(), resultTab, qb.getParseInfo().getOuterQueryLimit()); fetch.setSource(pCtx.getFetchSource()); fetch.setSink(pCtx.getFetchSink()); @@ -480,7 +480,7 @@ String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); - fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), + fetch = new FetchWork(loadFileWork.get(0).getSourcePath(), resultTab, qb.getParseInfo().getOuterQueryLimit()); ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName, Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -5658,7 +5658,7 @@ if (ltd != null && SessionState.get() != null) { SessionState.get().getLineageState() - .mapDirToFop(ltd.getSourceDir(), (FileSinkOperator) output); + .mapDirToFop(ltd.getSourcePath(), (FileSinkOperator) output); } if (LOG.isDebugEnabled()) { Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy) @@ -1948,7 +1948,7 @@ prop.setProperty("columns", colTypes[0]); prop.setProperty("columns.types", colTypes[1]); prop.setProperty(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName()); - FetchWork fetch = new FetchWork(ctx.getResFile().toString(), new TableDesc( + FetchWork fetch = new FetchWork(ctx.getResFile(), new TableDesc( TextInputFormat.class,IgnoreKeyTextOutputFormat.class, prop), -1); fetch.setSerializationNullFormat(" "); return (FetchTask) TaskFactory.get(fetch, conf); Index: ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java (working copy) @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -42,7 +43,7 @@ * time and is then later used to created the mapping from * movetask to the set of filesink operators. */ - private final Map dirToFop; + private final Map dirToFop; /** * The lineage context index for this query. @@ -59,7 +60,7 @@ * Constructor. */ public LineageState() { - dirToFop = new HashMap(); + dirToFop = new HashMap(); linfo = new LineageInfo(); } @@ -69,7 +70,7 @@ * @param dir The directory name. * @param fop The file sink operator. */ - public void mapDirToFop(String dir, FileSinkOperator fop) { + public void mapDirToFop(Path dir, FileSinkOperator fop) { dirToFop.put(dir, fop); } @@ -80,7 +81,7 @@ * @param dc The associated data container. * @param cols The list of columns. */ - public void setLineage(String dir, DataContainer dc, + public void setLineage(Path dir, DataContainer dc, List cols) { // First lookup the file sink operator from the load work. FileSinkOperator fop = dirToFop.get(dir); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -299,7 +299,7 @@ if (iterPath == null) { if (work.isNotPartitioned()) { if (!tblDataDone) { - currPath = work.getTblDirPath(); + currPath = work.getTblDir(); currTbl = work.getTblDesc(); if (isNativeTable) { FileSystem fs = currPath.getFileSystem(job); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -702,6 +702,20 @@ output.writeString(token.getText()); } } + + private static class PathSerializer extends com.esotericsoftware.kryo.Serializer { + + @Override + public void write(Kryo kryo, Output output, Path path) { + output.writeString(path.toUri().toString()); + } + + @Override + public Path read(Kryo kryo, Input input, Class type) { + return new Path(URI.create(input.readString())); + } + } + private static void serializePlan(Object plan, OutputStream out, Configuration conf, boolean cloningPlan) { PerfLogger perfLogger = PerfLogger.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN); @@ -843,6 +857,7 @@ kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); kryo.register(java.sql.Date.class, new SqlDateSerializer()); kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); + kryo.register(Path.class, new PathSerializer()); removeField(kryo, Operator.class, "colExprMap"); removeField(kryo, ColumnInfo.class, "objectInspector"); removeField(kryo, MapWork.class, "opParseCtxMap"); @@ -864,6 +879,7 @@ kryo.register(CommonToken.class, new CommonTokenSerializer()); kryo.register(java.sql.Date.class, new SqlDateSerializer()); kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); + kryo.register(Path.class, new PathSerializer()); return kryo; }; }; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (working copy) @@ -205,7 +205,7 @@ LoadFileDesc lfd = work.getLoadFileWork(); if (lfd != null) { Path targetPath = lfd.getTargetDir(); - Path sourcePath = new Path(lfd.getSourceDir()); + Path sourcePath = lfd.getSourcePath(); moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); } @@ -241,7 +241,7 @@ mesg.setLength(mesg.length()-2); mesg.append(')'); } - String mesg_detail = " from " + tbd.getSourceDir(); + String mesg_detail = " from " + tbd.getSourcePath(); console.printInfo(mesg.toString(), mesg_detail); Table table = db.getTable(tbd.getTable().getTableName()); @@ -281,7 +281,7 @@ DataContainer dc = null; if (tbd.getPartitionSpec().size() == 0) { dc = new DataContainer(table.getTTable()); - db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable() + db.loadTable(tbd.getSourcePath(), tbd.getTable() .getTableName(), tbd.getReplace(), tbd.getHoldDDLTime()); if (work.getOutputs() != null) { work.getOutputs().add(new WriteEntity(table)); @@ -294,7 +294,7 @@ List sortCols = null; int numBuckets = -1; Task task = this; - String path = tbd.getSourceDir(); + String path = tbd.getSourcePath().toUri().toString(); // Find the first ancestor of this MoveTask which is some form of map reduce task // (Either standard, local, or a merge) while (task.getParentTasks() != null && task.getParentTasks().size() == 1) { @@ -330,7 +330,7 @@ // condition for merging is not met, see GenMRFileSink1. if (task instanceof MoveTask) { if (((MoveTask)task).getWork().getLoadFileWork() != null) { - path = ((MoveTask)task).getWork().getLoadFileWork().getSourceDir(); + path = ((MoveTask)task).getWork().getLoadFileWork().getSourcePath().toUri().toString(); } } } @@ -354,7 +354,7 @@ // want to isolate any potential issue it may introduce. ArrayList> dp = db.loadDynamicPartitions( - new Path(tbd.getSourceDir()), + tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getPartitionSpec(), tbd.getReplace(), @@ -394,7 +394,7 @@ dc = new DataContainer(table.getTTable(), partn.getTPartition()); if (SessionState.get() != null) { - SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc, + SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } @@ -405,7 +405,7 @@ List partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec()); db.validatePartitionNameCharacters(partVals); - db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(), + db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(), tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd)); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); @@ -422,7 +422,7 @@ } } if (SessionState.get() != null && dc != null) { - SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc, + SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } releaseLocks(tbd); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (revision 1553793) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (working copy) @@ -524,7 +524,7 @@ FetchWork fetchWork; if (!partDesc.isPartitioned()) { assert paths.size() == 1; - fetchWork = new FetchWork(paths.get(0), partDesc.getTableDesc()); + fetchWork = new FetchWork(onePath, partDesc.getTableDesc()); } else { fetchWork = new FetchWork(paths, parts, partDesc.getTableDesc()); }