Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1052182) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -256,7 +256,7 @@ HIVECONVERTJOIN("hive.auto.convert.join", false), HIVESKEWJOINKEY("hive.skewjoin.key", 1000000), HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000), - HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432), //32M + HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M MAPREDMINSPLITSIZE("mapred.min.split.size", 1), HIVEMERGEMAPONLY("hive.mergejob.maponly", true), Index: ql/src/test/results/clientpositive/merge_dynamic_partition2.q.out =================================================================== --- ql/src/test/results/clientpositive/merge_dynamic_partition2.q.out (revision 0) +++ ql/src/test/results/clientpositive/merge_dynamic_partition2.q.out (revision 0) @@ -0,0 +1,163 @@ +PREHOOK: query: create table srcpart_merge_dp like srcpart +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table srcpart_merge_dp like srcpart +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcpart_merge_dp +PREHOOK: query: create table merge_dynamic_part like srcpart +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table merge_dynamic_part like srcpart +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@merge_dynamic_part +PREHOOK: query: load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11) +PREHOOK: type: LOAD +POSTHOOK: query: load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11) +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=11 +PREHOOK: query: load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11) +PREHOOK: type: LOAD +POSTHOOK: query: load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11) +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=11 +PREHOOK: query: load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11) +PREHOOK: type: LOAD +POSTHOOK: query: load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11) +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=11 +PREHOOK: query: load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11) +PREHOOK: type: LOAD +POSTHOOK: query: load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11) +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=11 +PREHOOK: query: load data local inpath '../data/files/srcbucket0.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12) +PREHOOK: type: LOAD +POSTHOOK: query: load data local inpath '../data/files/srcbucket0.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12) +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=12 +PREHOOK: query: load data local inpath '../data/files/srcbucket1.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12) +PREHOOK: type: LOAD +POSTHOOK: query: load data local inpath '../data/files/srcbucket1.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12) +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart_merge_dp@ds=2008-04-08/hr=12 +PREHOOK: query: explain +insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value, hr from srcpart_merge_dp where ds='2008-04-08' +PREHOOK: type: QUERY +POSTHOOK: query: explain +insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value, hr from srcpart_merge_dp where ds='2008-04-08' +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF srcpart_merge_dp)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB merge_dynamic_part (TOK_PARTSPEC (TOK_PARTVAL ds '2008-04-08') (TOK_PARTVAL hr)))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value)) (TOK_SELEXPR (TOK_TABLE_OR_COL hr))) (TOK_WHERE (= (TOK_TABLE_OR_COL ds) '2008-04-08')))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-5 depends on stages: Stage-1 , consists of Stage-4, Stage-3 + Stage-4 + Stage-0 depends on stages: Stage-4, Stage-3 + Stage-2 depends on stages: Stage-0 + Stage-3 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + srcpart_merge_dp + TableScan + alias: srcpart_merge_dp + Filter Operator + predicate: + expr: (ds = '2008-04-08') + type: boolean + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + expr: hr + type: string + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: merge_dynamic_part + + Stage: Stage-5 + Conditional Operator + + Stage: Stage-4 + Move Operator + files: + hdfs directory: true + destination: pfile:/data/users/nzhang/work/2/apache-hive/build/ql/scratchdir/hive_2010-12-15_11-09-33_994_7142549978626881266/-ext-10000 + + Stage: Stage-0 + Move Operator + tables: + partition: + ds 2008-04-08 + hr + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: merge_dynamic_part + + Stage: Stage-2 + Stats-Aggr Operator + + Stage: Stage-3 + Map Reduce + Alias -> Map Operator Tree: + pfile:/data/users/nzhang/work/2/apache-hive/build/ql/scratchdir/hive_2010-12-15_11-09-33_994_7142549978626881266/-ext-10002 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: merge_dynamic_part + + +PREHOOK: query: insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value, hr from srcpart_merge_dp where ds='2008-04-08' +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart_merge_dp@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart_merge_dp@ds=2008-04-08/hr=12 +PREHOOK: Output: default@merge_dynamic_part@ds=2008-04-08 +POSTHOOK: query: insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value, hr from srcpart_merge_dp where ds='2008-04-08' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart_merge_dp@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart_merge_dp@ds=2008-04-08/hr=12 +POSTHOOK: Output: default@merge_dynamic_part@ds=2008-04-08/hr=11 +POSTHOOK: Output: default@merge_dynamic_part@ds=2008-04-08/hr=12 +POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: show table extended like `merge_dynamic_part` +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: show table extended like `merge_dynamic_part` +POSTHOOK: type: SHOW_TABLESTATUS +POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=12).key SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: merge_dynamic_part PARTITION(ds=2008-04-08,hr=12).value SIMPLE [(srcpart_merge_dp)srcpart_merge_dp.FieldSchema(name:value, type:string, comment:default), ] +tableName:merge_dynamic_part +owner:null +location:pfile:/data/users/nzhang/work/2/apache-hive/build/ql/test/data/warehouse/merge_dynamic_part +inputformat:org.apache.hadoop.mapred.TextInputFormat +outputformat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +columns:struct columns { string key, string value} +partitioned:true +partitionColumns:struct partition_columns { string ds, string hr} +totalNumberFiles:3 +totalFileSize:17415 +maxFileSize:5901 +minFileSize:5702 +lastAccessTime:0 +lastUpdateTime:1292440184000 + Index: ql/src/test/queries/clientpositive/merge_dynamic_partition2.q =================================================================== --- ql/src/test/queries/clientpositive/merge_dynamic_partition2.q (revision 0) +++ ql/src/test/queries/clientpositive/merge_dynamic_partition2.q (revision 0) @@ -0,0 +1,27 @@ +set hive.exec.dynamic.partition=true; +set hive.exec.dynamic.partition.mode=nonstrict; + +create table srcpart_merge_dp like srcpart; + +create table merge_dynamic_part like srcpart; + +load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11); +load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11); +load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11); +load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=11); +load data local inpath '../data/files/srcbucket0.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12); +load data local inpath '../data/files/srcbucket1.txt' INTO TABLE srcpart_merge_dp partition(ds='2008-04-08', hr=12); + + +set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; +set hive.merge.smallfiles.avgsize=3000; +set hive.exec.compress.output=false; + +explain +insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value, hr from srcpart_merge_dp where ds='2008-04-08'; +insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value, hr from srcpart_merge_dp where ds='2008-04-08'; + +show table extended like `merge_dynamic_part`; + Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 1052182) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy) @@ -246,7 +246,7 @@ // NOTE: we should gather stats in MR1 (rather than the merge MR job) // since it is unknown if the merge MR will be triggered at execution time. - + MoveWork dummyMv = new MoveWork(null, null, null, new LoadFileDesc(fsConf.getDirName(), finalName, true, null, null), false); @@ -271,8 +271,7 @@ // create a Map-only job for merge, otherwise create a MapReduce merge job. ParseContext parseCtx = ctx.getParseCtx(); HiveConf conf = parseCtx.getConf(); - if ((conf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) || - conf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) && + if (conf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPONLY) && Utilities.supportCombineFileInputFormat()) { // create Map-only merge job createMap4Merge(fsOp, ctx, finalName); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (revision 1052182) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (working copy) @@ -316,7 +316,7 @@ newPlan.setNumMapTasks(HiveConf .getIntVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINNUMMAPTASK)); newPlan - .setMinSplitSize(HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT)); + .setMinSplitSize(HiveConf.getLongVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT)); newPlan.setInputformat(HiveInputFormat.class.getName()); Task skewJoinMapJoinTask = TaskFactory.get( newPlan, jc); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (revision 1052182) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (working copy) @@ -56,6 +56,7 @@ try { exitVal = tsk.executeTask(); } catch (Throwable t) { + t.printStackTrace(); } result.setExitVal(exitVal); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1052182) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -510,8 +510,7 @@ job.setNumMapTasks(work.getNumMapTasks().intValue()); } if (work.getMinSplitSize() != null) { - HiveConf.setIntVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize() - .intValue()); + HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue()); } job.setNumReduceTasks(work.getNumReduceTasks().intValue()); job.setReducerClass(ExecReducer.class); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (revision 1052182) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (working copy) @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; +import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -60,6 +61,54 @@ super(); } + private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) + throws Exception { + FileSystem fs = sourcePath.getFileSystem(conf); + if (isDfsDir) { + // Just do a rename on the URIs, they belong to the same FS + String mesg = "Moving data to: " + targetPath.toString(); + String mesg_detail = " from " + sourcePath.toString(); + console.printInfo(mesg, mesg_detail); + + // delete the output directory if it already exists + fs.delete(targetPath, true); + // if source exists, rename. Otherwise, create a empty directory + if (fs.exists(sourcePath)) { + if (!fs.rename(sourcePath, targetPath)) { + throw new HiveException("Unable to rename: " + sourcePath + + " to: " + targetPath); + } + } else if (!fs.mkdirs(targetPath)) { + throw new HiveException("Unable to make directory: " + targetPath); + } + } else { + // This is a local file + String mesg = "Copying data to local directory " + targetPath.toString(); + String mesg_detail = " from " + sourcePath.toString(); + console.printInfo(mesg, mesg_detail); + + // delete the existing dest directory + LocalFileSystem dstFs = FileSystem.getLocal(conf); + + if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) { + console.printInfo(mesg, mesg_detail); + // if source exists, rename. Otherwise, create a empty directory + if (fs.exists(sourcePath)) { + fs.copyToLocalFile(sourcePath, targetPath); + } else { + if (!dstFs.mkdirs(targetPath)) { + throw new HiveException("Unable to make local directory: " + + targetPath); + } + } + } else { + throw new AccessControlException( + "Unable to delete the existing destination directory: " + + targetPath); + } + } + } + @Override public int execute(DriverContext driverContext) { @@ -70,50 +119,24 @@ if (lfd != null) { Path targetPath = new Path(lfd.getTargetDir()); Path sourcePath = new Path(lfd.getSourceDir()); - FileSystem fs = sourcePath.getFileSystem(conf); - if (lfd.getIsDfsDir()) { - // Just do a rename on the URIs, they belong to the same FS - String mesg = "Moving data to: " + lfd.getTargetDir(); - String mesg_detail = " from " + lfd.getSourceDir(); - console.printInfo(mesg, mesg_detail); + moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + } - // delete the output directory if it already exists - fs.delete(targetPath, true); - // if source exists, rename. Otherwise, create a empty directory - if (fs.exists(sourcePath)) { - if (!fs.rename(sourcePath, targetPath)) { - throw new HiveException("Unable to rename: " + sourcePath - + " to: " + targetPath); - } - } else if (!fs.mkdirs(targetPath)) { - throw new HiveException("Unable to make directory: " + targetPath); - } - } else { - // This is a local file - String mesg = "Copying data to local directory " + lfd.getTargetDir(); - String mesg_detail = " from " + lfd.getSourceDir(); - console.printInfo(mesg, mesg_detail); - - // delete the existing dest directory - LocalFileSystem dstFs = FileSystem.getLocal(conf); - - if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) { - console.printInfo(mesg, mesg_detail); - // if source exists, rename. Otherwise, create a empty directory - if (fs.exists(sourcePath)) { - fs.copyToLocalFile(sourcePath, targetPath); - } else { - if (!dstFs.mkdirs(targetPath)) { - throw new HiveException("Unable to make local directory: " - + targetPath); - } - } - } else { - throw new AccessControlException( - "Unable to delete the existing destination directory: " - + targetPath); - } + // Multi-file load is for dynamic partitions when some partitions do not + // need to merge and they can simply be moved to the target directory. + LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork(); + if (lmfd != null) { + Path destPath = new Path(lmfd.getTargetDir()); + FileSystem fs = destPath.getFileSystem(conf); + if (!fs.exists(destPath)) { + fs.mkdirs(destPath); } + boolean isDfsDir = lmfd.getIsDfsDir(); + for (String s: lmfd.getSourceDirs()) { + Path srcPath = new Path(s); + Path dstPath = new Path(destPath, srcPath.getName()); + moveFile(srcPath, dstPath, isDfsDir); + } } // Next we do this for tables and partitions Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java (revision 1052182) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java (working copy) @@ -36,6 +36,7 @@ private static final long serialVersionUID = 1L; private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; + private LoadMultiFilesDesc loadMultiFilesWork; private boolean checkFileFormat; ArrayList dpSpecPaths; // dynamic partition specified paths -- the root of DP columns @@ -93,6 +94,15 @@ return loadFileWork; } + @Explain(displayName = "files") + public LoadMultiFilesDesc getLoadMultiFilesWork() { + return loadMultiFilesWork; + } + + public void setMultiFilesDesc(LoadMultiFilesDesc lmfd) { + this.loadMultiFilesWork = lmfd; + } + public void setLoadFileWork(final LoadFileDesc loadFileWork) { this.loadFileWork = loadFileWork; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (revision 1052182) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (working copy) @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -40,6 +42,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, Serializable { private static final long serialVersionUID = 1L; + static final private Log LOG = LogFactory.getLog(ConditionalResolverMergeFiles.class.getName()); public ConditionalResolverMergeFiles() { } @@ -118,88 +121,137 @@ .getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESAVGSIZE); trgtSize = trgtSize > avgConditionSize ? trgtSize : avgConditionSize; + Task mvTask = ctx.getListTasks().get(0); + Task mrTask = ctx.getListTasks().get(1); + try { - // If the input file does not exist, replace it by a empty file Path dirPath = new Path(dirName); FileSystem inpFs = dirPath.getFileSystem(conf); - + DynamicPartitionCtx dpCtx = ctx.getDPCtx(); + if (inpFs.exists(dirPath)) { - DynamicPartitionCtx dpCtx = ctx.getDPCtx(); - boolean doMerge = false; - FileStatus[] fStats = null; - if (dpCtx != null && dpCtx.getNumDPCols() > 0) { - fStats = Utilities.getFileStatusRecurse(dirPath, - dpCtx.getNumDPCols() + 1, inpFs); + // For each dynamic partition, check if it needs to be merged. + MapredWork work = (MapredWork) mrTask.getWork(); - } else { - fStats = inpFs.listStatus(dirPath); - } + // Dynamic partition: replace input path (root to dp paths) with dynamic partition + // input paths. + if (dpCtx != null && dpCtx.getNumDPCols() > 0) { - long totalSz = 0; - for (FileStatus fStat : fStats) { - totalSz += fStat.getLen(); - } - long currAvgSz = totalSz / fStats.length; - doMerge = (currAvgSz < avgConditionSize) && (fStats.length > 1); + // get list of dynamic partitions + FileStatus[] status = Utilities.getFileStatusRecurse(dirPath, + dpCtx.getNumDPCols(), inpFs); - if (doMerge) { - // - // for each dynamic partition, generate a merge task - // populate aliasToWork, pathToPartitionInfo, pathToAlias - // also set the number of reducers - // - Task tsk = ctx.getListTasks().get(1); - MapredWork work = (MapredWork) tsk.getWork(); + // cleanup pathToPartitionInfo + Map ptpi = work.getPathToPartitionInfo(); + assert ptpi.size() == 1; + String path = ptpi.keySet().iterator().next(); + TableDesc tblDesc = ptpi.get(path).getTableDesc(); + ptpi.remove(path); // the root path is not useful anymore + // cleanup pathToAliases + Map> pta = work.getPathToAliases(); + assert pta.size() == 1; + path = pta.keySet().iterator().next(); + ArrayList aliases = pta.get(path); + pta.remove(path); // the root path is not useful anymore - // Dynamic partition: replace input path (root to dp paths) with dynamic partition - // input paths. - if (dpCtx != null && dpCtx.getNumDPCols() > 0) { - FileStatus[] status = Utilities.getFileStatusRecurse(dirPath, - dpCtx.getNumDPCols(), inpFs); + // populate pathToPartitionInfo and pathToAliases w/ DP paths + long totalSz = 0; + boolean doMerge = false; + // list of paths that don't need to merge but need to move to the dest location + List toMove = new ArrayList(); + for (int i = 0; i < status.length; ++i) { + long len = getMergeSize(inpFs, status[i].getPath(), avgConditionSize); + if (len >= 0) { + doMerge = true; + totalSz += len; + work.getPathToAliases().put(status[i].getPath().toString(), aliases); + // get the full partition spec from the path and update the PartitionDesc + Map fullPartSpec = new LinkedHashMap( + dpCtx.getPartSpec()); + Warehouse.makeSpecFromName(fullPartSpec, status[i].getPath()); + PartitionDesc pDesc = new PartitionDesc(tblDesc, (LinkedHashMap) fullPartSpec); + work.getPathToPartitionInfo().put(status[i].getPath().toString(), pDesc); + } else { + toMove.add(status[i].getPath().toString()); + } + } + if (doMerge) { + // add the merge MR job + setupMapRedWork(conf, work, trgtSize, totalSz); + resTsks.add(mrTask); - // cleanup pathToPartitionInfo - Map ptpi = work.getPathToPartitionInfo(); - assert ptpi.size() == 1; - String path = ptpi.keySet().iterator().next(); - TableDesc tblDesc = ptpi.get(path).getTableDesc(); - ptpi.remove(path); // the root path is not useful anymore - - // cleanup pathToAliases - Map> pta = work.getPathToAliases(); - assert pta.size() == 1; - path = pta.keySet().iterator().next(); - ArrayList aliases = pta.get(path); - pta.remove(path); // the root path is not useful anymore - - // populate pathToPartitionInfo and pathToAliases w/ DP paths - for (int i = 0; i < status.length; ++i) { - work.getPathToAliases().put(status[i].getPath().toString(), aliases); - // get the full partition spec from the path and update the PartitionDesc - Map fullPartSpec = new LinkedHashMap( - dpCtx.getPartSpec()); - Warehouse.makeSpecFromName(fullPartSpec, status[i].getPath()); - PartitionDesc pDesc = new PartitionDesc(tblDesc, (LinkedHashMap) fullPartSpec); - work.getPathToPartitionInfo().put( - status[i].getPath().toString(), - pDesc); + // add the move task for those partitions that do not need merging + if (toMove.size() > 0) { // + // modify the existing move task as it is already in the candidate running tasks + MoveWork mvWork = (MoveWork) mvTask.getWork(); + LoadFileDesc lfd = mvWork.getLoadFileWork(); + LoadMultiFilesDesc lmfd = new LoadMultiFilesDesc(toMove, + lfd.getTargetDir(), lfd.getIsDfsDir(), lfd.getColumns(), lfd.getColumnTypes()); + mvWork.setLoadFileWork(null); + mvWork.setLoadTableWork(null); + mvWork.setMultiFilesDesc(lmfd); + resTsks.add(mvTask); } - } else { - int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); - int reducers = (int) ((totalSz + trgtSize - 1) / trgtSize); - reducers = Math.max(1, reducers); - reducers = Math.min(maxReducers, reducers); - work.setNumReduceTasks(reducers); + } else { // add the move task + resTsks.add(mvTask); } - - resTsks.add(tsk); - return resTsks; + } else { // no dynamic partitions + long totalSz = getMergeSize(inpFs, dirPath, avgConditionSize); + if (totalSz >= 0) { // add the merge job + setupMapRedWork(conf, work, trgtSize, totalSz); + resTsks.add(mrTask); + } else { // don't need to merge, add the move job + resTsks.add(mvTask); + } } + } else { + resTsks.add(mvTask); } } catch (IOException e) { e.printStackTrace(); } - resTsks.add(ctx.getListTasks().get(0)); return resTsks; } + + private void setupMapRedWork(HiveConf conf, MapredWork work, long targetSize, long totalSize) { + if (work.getNumReduceTasks() > 0) { + int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); + int reducers = (int) ((totalSize + targetSize - 1) / targetSize); + reducers = Math.max(1, reducers); + reducers = Math.min(maxReducers, reducers); + work.setNumReduceTasks(reducers); + } + work.setMinSplitSize(targetSize); + } + /** + * Whether to merge files inside directory given the threshold of the average file size. + * + * @param inpFs input file system. + * @param dirPath input file directory. + * @param avgSize threshold of average file size. + * @return -1 if not need to merge (either because of there is only 1 file or the + * average size is larger than avgSize). Otherwise the size of the total size of files. + * If return value is 0 that means there are multiple files each of which is an empty file. + * This could be true when the table is bucketized and all buckets are empty. + */ + private long getMergeSize(FileSystem inpFs, Path dirPath, long avgSize) { + try { + FileStatus[] fStats = inpFs.listStatus(dirPath); + if (fStats.length <= 1) { + return -1; + } + long totalSz = 0; + for (FileStatus fStat : fStats) { + totalSz += fStat.getLen(); + } + if (totalSz < avgSize * fStats.length) { + return totalSz; + } else { + return -1; + } + } catch (IOException e) { + return -1; + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1052182) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -60,7 +60,7 @@ private Integer numReduceTasks; private Integer numMapTasks; - private Integer minSplitSize; + private Long minSplitSize; private boolean needsTagging; private boolean hadoopSupportsSplittable; @@ -315,11 +315,11 @@ this.hadoopSupportsSplittable = hadoopSupportsSplittable; } - public Integer getMinSplitSize() { + public Long getMinSplitSize() { return minSplitSize; } - public void setMinSplitSize(Integer minSplitSize) { + public void setMinSplitSize(Long minSplitSize) { this.minSplitSize = minSplitSize; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java (revision 0) @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.List; + +/** + * LoadMultiFilesDesc. + * + */ +public class LoadMultiFilesDesc implements Serializable { + private static final long serialVersionUID = 1L; + private String targetDir; + private boolean isDfsDir; + // list of columns, comma separated + private String columns; + private String columnTypes; + private List srcDirs; + + public LoadMultiFilesDesc() { + } + + public LoadMultiFilesDesc(final List sourceDirs, final String targetDir, + final boolean isDfsDir, final String columns, final String columnTypes) { + + this.srcDirs = sourceDirs; + this.targetDir = targetDir; + this.isDfsDir = isDfsDir; + this.columns = columns; + this.columnTypes = columnTypes; + } + + @Explain(displayName = "destination") + public String getTargetDir() { + return targetDir; + } + + @Explain(displayName = "sources") + public List getSourceDirs() { + return srcDirs; + } + + public void setSourceDirs(List srcs) { + this.srcDirs = srcs; + } + + public void setTargetDir(final String targetDir) { + this.targetDir = targetDir; + } + + @Explain(displayName = "hdfs directory") + public boolean getIsDfsDir() { + return isDfsDir; + } + + public void setIsDfsDir(final boolean isDfsDir) { + this.isDfsDir = isDfsDir; + } + + /** + * @return the columns + */ + public String getColumns() { + return columns; + } + + /** + * @param columns + * the columns to set + */ + public void setColumns(String columns) { + this.columns = columns; + } + + /** + * @return the columnTypes + */ + public String getColumnTypes() { + return columnTypes; + } + + /** + * @param columnTypes + * the columnTypes to set + */ + public void setColumnTypes(String columnTypes) { + this.columnTypes = columnTypes; + } +}