Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 986523) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -239,6 +239,7 @@ HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000), HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432), //32M MAPREDMINSPLITSIZE("mapred.min.split.size", 1), + HIVEMERGEMAPONLY("hive.mergejob.maponly", true), HIVESENDHEARTBEAT("hive.heartbeat.interval", 1000), HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000), Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 986523) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -668,7 +668,7 @@ if (tasks != null) { File planDir = new File(outDir, "plan"); - File planFile = new File(planDir, tname.concat(".xml")); + String planFile = outPath(planDir.toString(), tname + ".xml"); File outf = null; outf = new File(logDir); @@ -690,7 +690,7 @@ + "\\|\\([0-9]\\{10\\}\\)" + "\\|\\(/.*/warehouse/.*\\)\\)"; cmdArray[4] = outf.getPath(); - cmdArray[5] = planFile.getPath(); + cmdArray[5] = planFile; System.out.println(cmdArray[0] + " " + cmdArray[1] + " " + cmdArray[2] + "\'" + cmdArray[3] + "\'" + " " + cmdArray[4] + " " + cmdArray[5]); @@ -708,7 +708,7 @@ if (exitVal != 0 && overWrite) { System.out.println("Overwriting results"); - String cmdLine = "cp " + outf.getPath() + " " + planFile.getPath(); + String cmdLine = "cp " + outf.getPath() + " " + planFile; executor = Runtime.getRuntime().exec(cmdLine); exitVal = executor.waitFor(); } @@ -887,10 +887,16 @@ } public ASTNode parseQuery(String tname) throws Exception { - return pd.parse(qMap.get(tname)); } + public void resetParser() throws SemanticException { + drv.init(); + pd = new ParseDriver(); + sem = new SemanticAnalyzer(conf); + } + + public List> analyzeAST(ASTNode ast) throws Exception { // Do semantic analysis and plan generation Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 986523) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy) @@ -334,7 +334,7 @@ } /** - * + * * @param tableName * table name * @param indexName @@ -380,20 +380,20 @@ String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME; Index old_index = null; try { - old_index = getIndex(dbName, tableName, indexName); + old_index = getIndex(dbName, tableName, indexName); } catch (Exception e) { } if (old_index != null) { throw new HiveException("Index " + indexName + " already exists on table " + tableName + ", db=" + dbName); } - + org.apache.hadoop.hive.metastore.api.Table baseTbl = getMSC().getTable(dbName, tableName); if (baseTbl.getTableType() == TableType.VIRTUAL_VIEW.toString()) { throw new HiveException("tableName="+ tableName +" is a VIRTUAL VIEW. Index on VIRTUAL VIEW is not supported."); } - + if (indexTblName == null) { - indexTblName = MetaStoreUtils.getIndexTableName(dbName, tableName, indexName); + indexTblName = MetaStoreUtils.getIndexTableName(dbName, tableName, indexName); } else { org.apache.hadoop.hive.metastore.api.Table temp = null; try { @@ -404,11 +404,11 @@ throw new HiveException("Table name " + indexTblName + " already exists. Choose another name."); } } - + org.apache.hadoop.hive.metastore.api.StorageDescriptor storageDescriptor = baseTbl.getSd().clone(); SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo(); if(serde != null) { - serdeInfo.setSerializationLib(serde); + serdeInfo.setSerializationLib(serde); } else { if (storageHandler == null) { serdeInfo.setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); @@ -437,7 +437,7 @@ if (lineDelim != null) { serdeInfo.getParameters().put(Constants.LINE_DELIM, lineDelim); } - + if (serdeProps != null) { Iterator> iter = serdeProps.entrySet() .iterator(); @@ -446,16 +446,16 @@ serdeInfo.getParameters().put(m.getKey(), m.getValue()); } } - + storageDescriptor.setLocation(null); if (location != null) { - storageDescriptor.setLocation(location); + storageDescriptor.setLocation(location); } storageDescriptor.setInputFormat(inputFormat); storageDescriptor.setOutputFormat(outputFormat); - + Map params = new HashMap(); - + List indexTblCols = new ArrayList(); List sortCols = new ArrayList(); storageDescriptor.setBucketCols(null); @@ -468,14 +468,15 @@ k++; } } - if (k != indexedCols.size()) + if (k != indexedCols.size()) { throw new RuntimeException( "Check the index columns, they should appear in the table being indexed."); - + } + storageDescriptor.setCols(indexTblCols); storageDescriptor.setSortCols(sortCols); - int time = (int) (System.currentTimeMillis() / 1000); + int time = (int) (System.currentTimeMillis() / 1000); org.apache.hadoop.hive.metastore.api.Table tt = null; HiveIndexHandler indexHandler = HiveUtils.getIndexHandler(this.getConf(), indexHandlerClass); @@ -489,18 +490,18 @@ if(!deferredRebuild) { throw new RuntimeException("Please specify deferred rebuild using \" WITH DEFERRED REBUILD \"."); } - + Index indexDesc = new Index(indexName, indexHandlerClass, dbName, tableName, time, time, indexTblName, storageDescriptor, params, deferredRebuild); indexHandler.analyzeIndexDefinition(baseTbl, indexDesc, tt); - + this.getMSC().createIndex(indexDesc, tt); - + } catch (Exception e) { throw new HiveException(e); } } - + public Index getIndex(String dbName, String baseTableName, String indexName) throws HiveException { try { @@ -509,7 +510,7 @@ throw new HiveException(e); } } - + public boolean dropIndex(String db_name, String tbl_name, String index_name, boolean deleteData) throws HiveException { try { return getMSC().dropIndex(db_name, tbl_name, index_name, deleteData); @@ -519,7 +520,7 @@ throw new HiveException("Unknow error. Please check logs.", e); } } - + /** * Drops table along with the data in it. If the table doesn't exist * then it is a no-op @@ -812,6 +813,9 @@ FileSystem fs = loadPath.getFileSystem(conf); FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDP, fs); + if (status.length == 0) { + LOG.warn("No partition is genereated by dynamic partitioning"); + } if (status.length > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) { throw new HiveException("Number of dynamic partitions created is " + status.length Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 986523) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy) @@ -21,9 +21,12 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Stack; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; @@ -38,7 +41,6 @@ import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; @@ -51,7 +53,9 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx; import org.apache.hadoop.hive.ql.plan.ConditionalWork; +import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExtractDesc; @@ -65,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; /** @@ -73,6 +76,8 @@ */ public class GenMRFileSink1 implements NodeProcessor { + static final private Log LOG = LogFactory.getLog(GenMRFileSink1.class.getName()); + public GenMRFileSink1() { } @@ -122,7 +127,7 @@ String finalName = processFS(nd, stack, opProcCtx, chDir); - // If it is a map-only job, insert a new task to do the concatenation + // need to merge the files in the destination table/partitions if (chDir && (finalName != null)) { createMergeJob((FileSinkOperator) nd, ctx, finalName); } @@ -130,8 +135,8 @@ return null; } - private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx, - String finalName) throws SemanticException { + private void createMapReduce4Merge(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) + throws SemanticException { Task currTask = ctx.getCurrTask(); RowSchema fsRS = fsOp.getSchema(); @@ -140,6 +145,7 @@ keyCols.add(TypeCheckProcFactory.DefaultExprProcessor .getFuncExprNodeDesc("rand")); + // value is all the columns in the FileSink operator input ArrayList valueCols = new ArrayList(); for (ColumnInfo ci : fsRS.getSignature()) { valueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(), @@ -178,7 +184,7 @@ pos = Integer.valueOf(pos.intValue() + 1); } - Operator extract = OperatorFactory.getAndMakeChild(new ExtractDesc( + Operator extract = OperatorFactory.getAndMakeChild(new ExtractDesc( new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema(out_rwsch.getColumnInfos())); @@ -234,6 +240,169 @@ } } + /** + * Create a MapReduce job for a particular partition if Hadoop version is pre 0.20, + * otherwise create a Map-only job using CombineHiveInputFormat for all partitions. + * @param fsOp The FileSink operator. + * @param ctx The MR processing context. + * @param finalName the final destination path the merge job should output. + * @throws SemanticException + */ + private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) + throws SemanticException { + + // if the hadoop version support CombineFileInputFormat (version >= 0.20), + // create a Map-only job for merge, otherwise create a MapReduce merge job. + ParseContext parseCtx = ctx.getParseCtx(); + if (parseCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) && + Utilities.supportCombineFileInputFormat()) { + // create Map-only merge job + createMap4Merge(fsOp, ctx, finalName); + LOG.info("use CombineHiveInputformat for the merge job"); + } else { + createMapReduce4Merge(fsOp, ctx, finalName); + LOG.info("use HiveInputFormat for the merge job"); + } + } + + private void createMap4Merge(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) { + + // create a Map-only merge job with the following operators: + // + // MR job J0: + // ... + // | + // v + // FileSinkOperator_1 + // | + // v + // Merge job J1: + // | + // v + // TableScan (using CombineHiveInputFormat) + // | + // v + // FileSinkOperator + // + // Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths do + // not contain the dynamic partitions (their parent). So after the dynamic partitions are + // created (after the first job finished before the moveTask or ConditionalTask start), + // we need to change the pathToPartitionInfo & pathToAlias to include the dynamic partition + // directories. + // + + // + // 1. create the operator tree + // + ParseContext parseCtx = ctx.getParseCtx(); + FileSinkDesc fsConf = fsOp.getConf(); + // Create a TableScan operator + RowSchema fsRS = fsOp.getSchema(); + Operator ts_op = OperatorFactory.get(TableScanDesc.class, fsRS); + + // Create a FileSink operator + ArrayList valueCols = new ArrayList(); + for (ColumnInfo ci : fsRS.getSignature()) { + valueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(), + ci.getTabAlias(), ci.getIsVirtualCol())); + } + ArrayList outputColumns = new ArrayList(); + for (int i = 0; i < valueCols.size(); i++) { + outputColumns.add(SemanticAnalyzer.getColumnInternalName(i)); + } + + TableDesc ts = (TableDesc) fsConf.getTableInfo().clone(); + /* + fsConf.getTableInfo().getProperties().remove( + org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS); + */ + FileSinkDesc fsDesc = new FileSinkDesc(finalName, ts, + parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT)); + FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild( + fsDesc, fsRS, ts_op); + + // If the input FileSinkOperator is a dynamic partition enabled, the TableScanOperator + // needs to include the partition column, and the FileSinkOperator_2 should have + // a DynamicPartitionContext to indicate it needs to dynamically partitioned. + // NOTE: this changes the RowSchema of the TableScanOperator so this has to be + // done after the FileSinkOperator is created. + DynamicPartitionCtx dpCtx = fsConf.getDynPartCtx(); + if (dpCtx != null && dpCtx.getNumDPCols() > 0) { + // adding dp column inf + ArrayList signature = fsRS.getSignature(); + String tblAlias = fsConf.getTableInfo().getTableName(); + LinkedHashMap colMap = new LinkedHashMap(); + for (String dpCol: dpCtx.getDPColNames()) { + ColumnInfo colInfo = new ColumnInfo(dpCol, TypeInfoFactory.stringTypeInfo, + tblAlias, true); + signature.add(colInfo); + colMap.put(dpCol, dpCol); + } + fsRS.setSignature(signature); + // + DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx); + dpCtx2.setInputToDPCols(colMap); + fsDesc.setDynPartCtx(dpCtx2); + } + + // + // 2. create the merge job J1 and make the TableScan operator as the root of J1 + // + MapredWork cplan = GenMapRedUtils.getMapRedWork(parseCtx.getConf()); + ArrayList aliases = new ArrayList(); + aliases.add(fsConf.getDirName()); + // using CombineHiveInputFormat + cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"); + cplan.getPathToAliases().put(fsConf.getDirName(), aliases); + cplan.getPathToPartitionInfo().put(fsConf.getDirName(), + new PartitionDesc(fsConf.getTableInfo(), null)); + cplan.setNumReduceTasks(Integer.valueOf(0)); // no reducers + cplan.getAliasToWork().put(fsConf.getDirName(), ts_op); + + // + // 3. create a conditional task J2 making J1 as one of the list of conditional tasks. + // + Task mergeTask = TaskFactory.get(cplan, parseCtx .getConf()); + MoveWork dummyMv = new MoveWork(null, null, null, + new LoadFileDesc(fsOp.getConf().getDirName(), finalName, true, null, null), false); + Task dummyMoveTask = TaskFactory.get(dummyMv, ctx.getConf()); + List listWorks = new ArrayList(); + listWorks.add(dummyMv); + listWorks.add(mergeTask.getWork()); + ConditionalWork cndWork = new ConditionalWork(listWorks); + + ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, ctx + .getConf()); + List> listTasks = new ArrayList>(); + listTasks.add(dummyMoveTask); + listTasks.add(mergeTask); + cndTsk.setListTasks(listTasks); + + cndTsk.setResolver(new ConditionalResolverMergeFiles()); + ConditionalResolverMergeFilesCtx mrCtx = new ConditionalResolverMergeFilesCtx(listTasks, fsOp + .getConf().getDirName()); + mrCtx.setDPCtx(fsConf.getDynPartCtx()); // remember whether it is dynamic partition or not + cndTsk.setResolverCtx(mrCtx); + + // + // 4. make the conditional task J2 as the child of the current task + // + Task currTask = ctx.getCurrTask(); + currTask.addDependentTask(cndTsk); + + // + // 5. add the moveTask as the children of the conditional task + // + List> mvTasks = ctx.getMvTask(); + Task mvTask = findMoveTask(mvTasks, newOutput); + + if (mvTask != null) { + for (Task tsk : cndTsk.getListTasks()) { + tsk.addDependentTask(mvTask); + } + } + } + private Task findMoveTask( List> mvTasks, FileSinkOperator fsOp) { // find the move task @@ -251,10 +420,19 @@ return mvTsk; } } - return null; } + /** + * Process the FileSink operator to generate a MoveTask if necessary. + * @param nd current FileSink operator + * @param stack parent operators + * @param opProcCtx + * @param chDir whether the operator should be first output to a tmp dir and then merged + * to the final dir later + * @return the final file name to which the FileSinkOperator should store. + * @throws SemanticException + */ private String processFS(Node nd, Stack stack, NodeProcessorCtx opProcCtx, boolean chDir) throws SemanticException { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 986523) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy) @@ -27,8 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -151,6 +151,17 @@ } /** + * The default dependent tasks are just child tasks, but different types + * could implement their own (e.g. ConditionalTask will use the listTasks + * as dependents). + * + * @return a list of tasks that are dependent on this task. + */ + public List> getDependentTasks() { + return getChildTasks(); + } + + /** * Add a dependent task on the current task. Return if the dependency already * existed or is this a new one * @@ -297,8 +308,9 @@ public final void localizeMRTmpFiles(Context ctx) { localizeMRTmpFilesImpl(ctx); - if (childTasks == null) + if (childTasks == null) { return; + } for (Task t: childTasks) { t.localizeMRTmpFiles(ctx); @@ -306,4 +318,3 @@ } } - \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (revision 986523) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (working copy) @@ -203,8 +203,15 @@ @Override protected void localizeMRTmpFilesImpl(Context ctx) { - if (getListTasks() != null) - for(Task t: getListTasks()) + if (getListTasks() != null) { + for(Task t: getListTasks()) { t.localizeMRTmpFiles(ctx); + } + } } + + @Override + public List> getDependentTasks() { + return listTasks; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (revision 986523) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (working copy) @@ -43,4 +43,14 @@ public ArrayList getSignature() { return signature; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder('('); + for (ColumnInfo col: signature) { + sb.append(col.toString()); + } + sb.append(')'); + return sb.toString(); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 986523) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -89,6 +89,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; @@ -1416,13 +1417,13 @@ mrTasks.add((ExecDriver)task); } - if (task instanceof ConditionalTask) { - getMRTasks(((ConditionalTask)task).getListTasks(), mrTasks); + if (task.getDependentTasks() != null) { + getMRTasks(task.getDependentTasks(), mrTasks); } - - if (task.getChildTasks() != null) { - getMRTasks(task.getChildTasks(), mrTasks); - } } } + + public static boolean supportCombineFileInputFormat() { + return ShimLoader.getHadoopShims().getCombineFileInputFormat() != null; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (revision 986523) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (working copy) @@ -21,13 +21,17 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; /** * Conditional task resolution interface. This is invoked at run time to get the @@ -48,6 +52,7 @@ private static final long serialVersionUID = 1L; List> listTasks; private String dir; + private DynamicPartitionCtx dpCtx; // merge task could be after dynamic partition insert public ConditionalResolverMergeFilesCtx() { } @@ -90,6 +95,14 @@ public void setListTasks(List> listTasks) { this.listTasks = listTasks; } + + public DynamicPartitionCtx getDPCtx() { + return dpCtx; + } + + public void setDPCtx(DynamicPartitionCtx dp) { + dpCtx = dp; + } } public List> getTasks(HiveConf conf, @@ -119,15 +132,56 @@ long currAvgSz = totalSz / fStats.length; if ((currAvgSz < avgConditionSize) && (fStats.length > 1)) { + // + // for each dynamic partition, generate a merge task + // populate aliasToWork, pathToPartitionInfo, pathToAlias // also set the number of reducers + // Task tsk = ctx.getListTasks().get(1); MapredWork work = (MapredWork) tsk.getWork(); - int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); - int reducers = (int) ((totalSz + trgtSize - 1) / trgtSize); - reducers = Math.max(1, reducers); - reducers = Math.min(maxReducers, reducers); - work.setNumReduceTasks(reducers); + + // Dynamic partition: replace input path (root to dp paths) with dynamic partition + // input paths. + DynamicPartitionCtx dpCtx = ctx.getDPCtx(); + if (dpCtx != null && dpCtx.getNumDPCols() > 0) { + FileStatus[] status = Utilities.getFileStatusRecurse(dirPath, + dpCtx.getNumDPCols(), inpFs); + + // cleanup pathToPartitionInfo + Map ptpi = work.getPathToPartitionInfo(); + assert ptpi.size() == 1; + String path = ptpi.keySet().iterator().next(); + TableDesc tblDesc = ptpi.get(path).getTableDesc(); + ptpi.remove(path); // the root path is not useful anymore + + // cleanup pathToAliases + Map> pta = work.getPathToAliases(); + assert pta.size() == 1; + path = pta.keySet().iterator().next(); + ArrayList aliases = pta.get(path); + pta.remove(path); // the root path is not useful anymore + + // populate pathToPartitionInfo and pathToAliases w/ DP paths + for (int i = 0; i < status.length; ++i) { + work.getPathToAliases().put(status[i].getPath().toString(), aliases); + // get the full partition spec from the path and update the PartitionDesc + Map fullPartSpec = new LinkedHashMap( + dpCtx.getPartSpec()); + Warehouse.makeSpecFromName(fullPartSpec, status[i].getPath()); + PartitionDesc pDesc = new PartitionDesc(tblDesc, (LinkedHashMap) fullPartSpec); + work.getPathToPartitionInfo().put( + status[i].getPath().toString(), + pDesc); + } + } else { + int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); + int reducers = (int) ((totalSz + trgtSize - 1) / trgtSize); + reducers = Math.max(1, reducers); + reducers = Math.min(maxReducers, reducers); + work.setNumReduceTasks(reducers); + } + resTsks.add(tsk); return resTsks; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java (revision 986523) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java (working copy) @@ -78,6 +78,20 @@ } } + public DynamicPartitionCtx(DynamicPartitionCtx dp) { + this.partSpec = dp.partSpec; + this.numDPCols = dp.numDPCols; + this.numSPCols = dp.numSPCols; + this.spPath = dp.spPath; + this.rootPath = dp.rootPath; + this.numBuckets = dp.numBuckets; + this.inputToDPCols = dp.inputToDPCols; + this.spNames = dp.spNames; + this.dpNames = dp.dpNames; + this.defaultPartName = dp.defaultPartName; + this.maxPartsPerNode = dp.maxPartsPerNode; + } + public void mapInputToDP(List fs) { assert fs.size() == this.numDPCols: "input DP column size != numDPCols"; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 986523) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -32,9 +32,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; -import java.util.Map.Entry; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -94,6 +94,7 @@ import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; import org.apache.hadoop.hive.ql.optimizer.GenMROperator; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; +import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; @@ -103,7 +104,6 @@ import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.Optimizer; -import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; @@ -123,6 +123,7 @@ import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.plan.ForwardDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; @@ -144,12 +145,11 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UDTFDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; -import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; @@ -157,9 +157,9 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -3273,10 +3273,12 @@ } if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP - // TODO: we should support merge files for dynamically generated partitions later if (dpCtx.getNumDPCols() > 0 && (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES) || - HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES))) { + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) && + Utilities.supportCombineFileInputFormat() == false) { + // Do not support merge for Hadoop versions (pre-0.20) that do not + // support CombineHiveInputFormat HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES, false); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); } @@ -6043,12 +6045,12 @@ private void getLeafTasks(Task task, HashSet> leaves) { - if (task.getChildTasks() == null) { + if (task.getDependentTasks() == null) { if (!leaves.contains(task)) { leaves.add(task); } } else { - getLeafTasks(task.getChildTasks(), leaves); + getLeafTasks(task.getDependentTasks(), leaves); } }