Index: ql/src/test/queries/clientpositive/ppr_pushdown.q =================================================================== --- ql/src/test/queries/clientpositive/ppr_pushdown.q (revision 1368629) +++ ql/src/test/queries/clientpositive/ppr_pushdown.q (working copy) @@ -19,7 +19,10 @@ insert overwrite table ppr_test partition(ds = '12*4') select * from (select '12*4' from src limit 1 union all select 'abcd' from src limit 1) s; +explain extended select * from ppr_test where ds = '1234' order by key; + +select * from ppr_test where ds = '1234' order by key; select * from ppr_test where ds = '1224' order by key; select * from ppr_test where ds = '1214' order by key; select * from ppr_test where ds = '12.4' order by key; Index: ql/src/test/queries/clientpositive/partition_special_char.q =================================================================== --- ql/src/test/queries/clientpositive/partition_special_char.q (revision 1368629) +++ ql/src/test/queries/clientpositive/partition_special_char.q (working copy) @@ -1,19 +1,14 @@ + create table sc as select * from (select '2011-01-11', '2011-01-11+14:18:26' from src limit 1 union all - select '2011-01-11', '2011-01-11+15:18:26' from src limit 1 - union all select '2011-01-11', '2011-01-11+16:18:26' from src limit 1 ) s; -create table sc_part (key string) partitioned by (ts string) stored as rcfile; +create table sc_part (key string, key2 string) stored as rcfile; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; -insert overwrite table sc_part partition(ts) select * from sc; -show partitions sc_part; -select count(*) from sc_part where ts is not null; +insert overwrite table sc_part select * from sc; -insert overwrite table sc_part partition(ts) select * from sc; -show partitions sc_part; -select count(*) from sc_part where ts is not null; + Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy) @@ -591,8 +591,12 @@ srcDir = mvWork.getLoadTableWork().getSourceDir(); } + String fsOpDirName = fsOp.getConf().getDirName(); + if (fsOp.getConf().isSubDir()) { + fsOpDirName = fsOp.getConf().getParentDir(); + } if ((srcDir != null) - && (srcDir.equalsIgnoreCase(fsOp.getConf().getDirName()))) { + && (srcDir.equalsIgnoreCase(fsOpDirName))) { return mvTsk; } } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -35,6 +35,7 @@ * Implementation of the optimizer. */ public class Optimizer { + private ParseContext pctx; private List transformations; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (working copy) @@ -24,11 +24,14 @@ import java.util.Map; import java.util.Stack; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -39,8 +42,8 @@ import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; +import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory; -import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; @@ -241,6 +244,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException { UnionOperator union = (UnionOperator) nd; + assert union.getParentOperators() != null; GenMRProcContext ctx = (GenMRProcContext) opProcCtx; ParseContext parseCtx = ctx.getParseCtx(); UnionProcContext uCtx = parseCtx.getUCtx(); @@ -279,6 +283,7 @@ uTask = uCtxTask.getUTask(); } + boolean simpleUnion = false; // Copy into the current union task plan if if (uPrsCtx.getMapOnlySubq(pos) && !uPrsCtx.getMapJoinSubq(pos) && uPrsCtx.getRootTask(pos)) { @@ -286,29 +291,97 @@ } // If it a map-reduce job, create a temporary file else { - // is the current task a root task - if (shouldBeRootTask(currTask, parseCtx) - && (!ctx.getRootTasks().contains(currTask))) { - ctx.getRootTasks().add(currTask); + + // If we can directly write the results into the final directory, then don't create a + // new task + simpleUnion = processSimpleUnion(union, uPrsCtx); + if (!simpleUnion) { + // is the current task a root task + if (shouldBeRootTask(currTask, parseCtx) + && (!ctx.getRootTasks().contains(currTask))) { + ctx.getRootTasks().add(currTask); + } + // If there is a mapjoin at position 'pos' + if (uPrsCtx.getMapJoinSubq(pos)) { + processSubQueryUnionMapJoin(ctx); + } + + processSubQueryUnionCreateIntermediate(union.getParentOperators().get(pos), + union, uTask, ctx, uCtxTask); + //the currAliasId and CurrTopOp is not valid any more + ctx.setCurrAliasId(null); + ctx.setCurrTopOp(null); + ctx.getOpTaskMap().put(null, uTask); } - // If there is a mapjoin at position 'pos' - if (uPrsCtx.getMapJoinSubq(pos)) { - processSubQueryUnionMapJoin(ctx); + else { + ctx.setCurrUnionOp(null); + uCtx.setUnionParseContext(union, null); + union.setParentOperators(null); } + } - processSubQueryUnionCreateIntermediate(union.getParentOperators().get(pos), union, uTask, ctx, uCtxTask); - //the currAliasId and CurrTopOp is not valid any more - ctx.setCurrAliasId(null); - ctx.setCurrTopOp(null); - ctx.getOpTaskMap().put(null, uTask); + if (!simpleUnion) { + ctx.setCurrTask(uTask); + mapCurrCtx.put((Operator) nd, + new GenMapRedCtx(ctx.getCurrTask(), null, null)); } + return null; + } - ctx.setCurrTask(uTask); + /** + * If the union operator is followed by a select * and a file sink, + * then directly write to the final location, instead of creating a new task. + * @param union + * @param uPrsCtx the number of inputs for the union operator + * @return true if the replacement was successful, false otherwise + */ + private boolean processSimpleUnion(UnionOperator union, UnionParseContext uPrsCtx) { - mapCurrCtx.put((Operator) nd, - new GenMapRedCtx(ctx.getCurrTask(), null, null)); + for (int pos = 0; pos < uPrsCtx.getNumInputs(); pos++) { + // If at least one parent doesn't create temporary files, then we can't apply + // this optimization + if (uPrsCtx.getMapOnlySubq(pos) + && !uPrsCtx.getMapJoinSubq(pos) && uPrsCtx.getRootTask(pos)) { + return false; + } + } - return null; + // We need a union operator followed by a select star followed by a file sink + assert union.getChildOperators().size() == 1; + Operator unionChild = union.getChildOperators().get(0); + if (!(unionChild instanceof SelectOperator)) { + return false; + } + if (!((SelectOperator) unionChild).getConf().isSelectStar()) { + return false; + } + List> selectChildren = unionChild.getChildOperators(); + if (selectChildren.size() != 1 || !(selectChildren.get(0) instanceof FileSinkOperator)) { + return false; + } + + // Now we can apply the optimization + FileSinkOperator fileSinkOp = (FileSinkOperator) selectChildren.get(0); + String parentDirName = fileSinkOp.getConf().getDirName(); + for (Operator parent : union.getParentOperators()) { + // Clone the fileSinkDesc of the final fileSink and create similar fileSinks at + // each parent + FileSinkDesc fileSinkDesc = null; + try { + fileSinkDesc = (FileSinkDesc) fileSinkOp.getConf().clone(); + } catch (CloneNotSupportedException e) { + } + + String dirName = parentDirName + Path.SEPARATOR + parent.getIdentifier() ; + fileSinkDesc.setDirName(dirName); + fileSinkDesc.setSubDir(true); + fileSinkDesc.setParentDir(parentDirName); + parent.setChildOperators(null); + Operator tmpFileSinkOp = + OperatorFactory.getAndMakeChild(fileSinkDesc, parent); + tmpFileSinkOp.setChildOperators(null); + } + return true; } private boolean shouldBeRootTask( Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMROperator.java (working copy) @@ -39,7 +39,7 @@ /** * Reduce Scan encountered. - * + * * @param nd * the reduce sink operator encountered * @param procCtx @@ -52,9 +52,11 @@ Map, GenMapRedCtx> mapCurrCtx = ctx .getMapCurrCtx(); GenMapRedCtx mapredCtx = mapCurrCtx.get(stack.get(stack.size() - 2)); - mapCurrCtx.put((Operator) nd, new GenMapRedCtx( - mapredCtx.getCurrTask(), mapredCtx.getCurrTopOp(), mapredCtx - .getCurrAliasId())); + if (mapredCtx != null) { + mapCurrCtx.put((Operator) nd, new GenMapRedCtx( + mapredCtx.getCurrTask(), mapredCtx.getCurrTopOp(), mapredCtx + .getCurrAliasId())); + } return null; } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (working copy) @@ -28,7 +28,7 @@ * */ @Explain(displayName = "File Output Operator") -public class FileSinkDesc implements Serializable { +public class FileSinkDesc implements Serializable, Cloneable { private static final long serialVersionUID = 1L; private String dirName; // normally statsKeyPref will be the same as dirName, but the latter @@ -46,6 +46,11 @@ private DynamicPartitionCtx dpCtx; private String staticSpec; // static partition spec ends with a '/' private boolean gatherStats; + // Certain unions are converted to direct fileSinks into the subdirectories + // of the union's parents. These are marked with subDir and the parent directories + // are stored + private boolean subDir; + private String parentDir; private boolean statsReliable; public FileSinkDesc() { @@ -80,6 +85,19 @@ this.partitionCols = null; } + @Override + public Object clone() throws CloneNotSupportedException { + FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, + destTableId, multiFileSpray, numFiles, totalFiles, + partitionCols, dpCtx); + ret.setCompressCodec(compressCodec); + ret.setCompressType(compressType); + ret.setGatherStats(gatherStats); + ret.setStaticSpec(staticSpec); + ret.setStatsAggPrefix(statsKeyPref); + return (Object) ret; + } + @Explain(displayName = "directory", normalExplain = false) public String getDirName() { return dirName; @@ -249,6 +267,22 @@ } } + public boolean isSubDir() { + return subDir; + } + + public void setSubDir(boolean subDir) { + this.subDir = subDir; + } + + public String getParentDir() { + return parentDir; + } + + public void setParentDir(String parentDir) { + this.parentDir = parentDir; + } + public boolean isStatsReliable() { return statsReliable; } Index: ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java (working copy) @@ -36,7 +36,7 @@ /** * Constructor. - * + * * @param defaultProc * default processor to be fired if no rule matches * @param rules @@ -53,7 +53,7 @@ /** * Dispatcher function. - * + * * @param nd * operator to process * @param ndStack Index: ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (working copy) @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.Stack; @@ -43,7 +42,7 @@ /** * Constructor. - * + * * @param disp * dispatcher to call for each op encountered */ @@ -68,7 +67,7 @@ /** * Dispatch the current operator. - * + * * @param nd * node being walked * @param ndStack @@ -91,7 +90,7 @@ /** * starting point for walking. - * + * * @throws SemanticException */ public void startWalking(Collection startNodes, @@ -108,12 +107,13 @@ /** * walk the current operator and its descendants. - * + * * @param nd * current operator in the graph * @throws SemanticException */ public void walk(Node nd) throws SemanticException { + if (opStack.empty() || nd != opStack.peek()) { opStack.push(nd); } @@ -122,7 +122,7 @@ || getDispatchedList().containsAll(nd.getChildren())) { // all children are done or no need to walk the children if (!getDispatchedList().contains(nd)) { - dispatch(nd, opStack); + dispatch(nd, opStack); } opStack.pop(); return; Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy) @@ -31,13 +31,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.HashSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,7 +63,6 @@ import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectType; import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -1867,11 +1866,11 @@ // calling loadTable/Partition. But leaving it in just in case. fs.delete(itemStaging, true); continue; - } + }/* if (item.isDir()) { throw new HiveException("checkPaths: " + src.getPath() + " has nested directory" + itemStaging); - } + }*/ if (!replace) { // It's possible that the file we're copying may have the same // relative name as an existing file in the "destf" directory. Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -122,7 +122,7 @@ * Append a subdirectory to the tmp path. * * @param dp - * subdirecgtory name + * subdirectory name */ public void appendTmpPath(String dp) { tmpPath = new Path(tmpPath, dp); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -70,6 +71,7 @@ private PartitionDesc currPart; private TableDesc currTbl; private boolean tblDataDone; + private HashSet doneTables; private transient RecordReader currRecReader; private transient InputSplit[] inputSplits; @@ -199,31 +201,64 @@ return partValues; } - private void getNextPath() throws Exception { - // first time - if (iterPath == null) { - if (work.getTblDir() != null) { + + /** + * Update currPath with the appropriate path to the table. If the given TblDirPath is + * non-empty, that is the currPath. Otherwise, we take the non-empty children of TblDirPath + */ + private void getTableInfo() throws Exception { + currPath = work.getTblDirPath(); + currTbl = work.getTblDesc(); + if (isNativeTable) { + if (doneTables == null) { + doneTables = new HashSet(); + } + FileSystem fs = currPath.getFileSystem(job); + if (fs.exists(currPath)) { + FileStatus[] fStats = listStatusUnderPath(fs, currPath); + for (FileStatus fStat : fStats) { + if (!fStat.isDir() && fStat.getLen() > 0 && !doneTables.contains(currPath)) { + tblDataDone = true; + doneTables.add(currPath); + break; + } + } + if (!tblDataDone) { - currPath = work.getTblDirPath(); - currTbl = work.getTblDesc(); - if (isNativeTable) { - FileSystem fs = currPath.getFileSystem(job); - if (fs.exists(currPath)) { - FileStatus[] fStats = listStatusUnderPath(fs, currPath); - for (FileStatus fStat : fStats) { - if (fStat.getLen() > 0) { - tblDataDone = true; - break; + for (FileStatus fStat : fStats) { + if (fStat.isDir() && !doneTables.contains(fStat.getPath())) { + boolean nonEmpty = false; + for (FileStatus fStatChild : listStatusUnderPath(fs, fStat.getPath())) { + if (!fStatChild.isDir() && fStatChild.getLen() > 0) { + nonEmpty = true; } } + if (nonEmpty) { + currPath = fStat.getPath(); + doneTables.add(fStat.getPath()); + tblDataDone = true; + break; + } } - } else { - tblDataDone = true; } + } + } + } else { + tblDataDone = true; + } - if (!tblDataDone) { - currPath = null; - } + if (!tblDataDone) { + currPath = null; + } + } + + private void getNextPath() throws Exception { + // first time + if (iterPath == null) { + if (work.getTblDir() != null) { + if (!tblDataDone) { + getTableInfo(); + tblDataDone = false; return; } else { currTbl = null; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 1368629) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -875,7 +875,6 @@ s.append(" "); for (Operator o : parentOperators) { s.append("Id = " + o.id + " "); - s.append(o.dump(level, seenOpts)); } s.append("<\\Parent>"); }