Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 982092) +++ conf/hive-default.xml (working copy) @@ -588,6 +588,13 @@ + hive.exec.max.created.files + 100000 + Maximum number of HDFS files created by all mappers/reducers in a MapReduce job. + + + + hive.default.partition.name __HIVE_DEFAULT_PARTITION__ The default partition name in case the dynamic partition column value is null/empty string or anyother values that cannot be escaped. This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). The user has to be aware that the dynamic partition value should not contain this value to avoid confusions. Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 982092) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -91,6 +91,7 @@ DYNAMICPARTITIONINGMODE("hive.exec.dynamic.partition.mode", "strict"), DYNAMICPARTITIONMAXPARTS("hive.exec.max.dynamic.partitions", 1000), DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100), + MAXCREATEDFILES("hive.exec.max.created.files", 100000), DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"), Index: ql/src/test/results/clientnegative/dyn_part3.q.out =================================================================== --- ql/src/test/results/clientnegative/dyn_part3.q.out (revision 0) +++ ql/src/test/results/clientnegative/dyn_part3.q.out (revision 0) @@ -0,0 +1,9 @@ +PREHOOK: query: create table nzhang_part( key string) partitioned by (value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table nzhang_part( key string) partitioned by (value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@nzhang_part +PREHOOK: query: insert overwrite table nzhang_part partition(value) select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Index: ql/src/test/queries/clientnegative/dyn_part3.q =================================================================== --- ql/src/test/queries/clientnegative/dyn_part3.q (revision 0) +++ ql/src/test/queries/clientnegative/dyn_part3.q (revision 0) @@ -0,0 +1,9 @@ +set hive.exec.max.dynamic.partitions=600; +set hive.exec.max.dynamic.partitions.pernode=600; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.exec.dynamic.partition=true; +set hive.exec.max.created.files=100; + +create table nzhang_part( key string) partitioned by (value string); + +insert overwrite table nzhang_part partition(value) select key, value from src; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (revision 982092) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (working copy) @@ -25,8 +25,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; -import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -185,7 +185,7 @@ * Initializes this map op as the root of the tree. It sets JobConf & * MapRedWork and starts initialization of the operator tree rooted at this * op. - * + * * @param hconf * @param mrwork * @throws HiveException @@ -292,7 +292,7 @@ Operator op = conf.getAliasToWork().get( onealias); LOG.info("Adding alias " + onealias + " to work list for file " - + fpath.toUri().getPath()); + + onefile); MapInputPath inp = new MapInputPath(onefile, onealias, op); opCtxMap.put(inp, opCtx); if (operatorToPaths.get(op) == null) { @@ -449,15 +449,15 @@ try { rawRowString = value.toString(); } catch (Exception e2) { - rawRowString = "[Error getting row data with exception " + + rawRowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2) + " ]"; } - + // TODO: policy on deserialization errors deserialize_error_count.set(deserialize_error_count.get() + 1); throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e); } - + try { if (this.hasVC) { forward(this.rowWithPartAndVC, this.rowObjectInspector); @@ -478,7 +478,7 @@ rowString = SerDeUtils.getJSONString(rowWithPart, rowObjectInspector); } } catch (Exception e2) { - rowString = "[Error getting row data with exception " + + rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2) + " ]"; } throw new HiveException("Hive Runtime Error while processing row " + rowString, e); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 982092) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -49,10 +49,10 @@ import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -463,6 +463,9 @@ // buckets of dynamic partitions will be created for each newly created partition fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter( jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx]); + // increment the CREATED_FILES counter + ProgressCounter pc = ProgressCounter.valueOf("CREATED_FILES"); + reporter.incrCounter(pc, 1); filesIdx++; } assert filesIdx == numFiles; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 982092) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -29,7 +29,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.Explain; @@ -845,6 +844,7 @@ * TODO This is a hack for hadoop 0.17 which only supports enum counters. */ public static enum ProgressCounter { + CREATED_FILES, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 982092) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -19,14 +19,12 @@ package org.apache.hadoop.hive.ql.exec; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; -import java.net.URI; +import java.net.URL; import java.net.URLDecoder; -import java.net.URL; import java.net.URLEncoder; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -38,22 +36,21 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.Set; import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; @@ -61,9 +58,9 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -77,14 +74,13 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.varia.NullAppender; import org.apache.log4j.LogManager; import org.apache.log4j.PropertyConfigurator; +import org.apache.log4j.varia.NullAppender; /** * ExecDriver. @@ -363,6 +359,17 @@ rj.killJob(); continue; } + // check for number of created files + Counters ctrs = th.getCounters(); + ProgressCounter pc = ProgressCounter.valueOf("CREATED_FILES"); + long numFiles = ctrs.getCounter(pc); + long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES); + if (numFiles > upperLimit) { + fatal = true; + console.printError("[Fatal Error] total number of files exceeds " + upperLimit); + rj.killJob(); + continue; + } errMsg.setLength(0); updateCounters(th); @@ -640,8 +647,9 @@ } finally { Utilities.clearMapRedWork(job); try { - if(ctxCreated) + if(ctxCreated) { ctx.clear(); + } if (rj != null) { if (returnVal != 0) { @@ -895,9 +903,10 @@ private static void setupChildLog4j(Configuration conf) { URL hive_l4j = ExecDriver.class.getClassLoader().getResource (SessionState.HIVE_EXEC_L4J); - if(hive_l4j == null) + if(hive_l4j == null) { hive_l4j = ExecDriver.class.getClassLoader().getResource (SessionState.HIVE_L4J); + } if (hive_l4j != null) { // setting queryid so that log4j configuration can use it to generate @@ -1072,13 +1081,13 @@ sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt(), "UTF-8")); } - + return sb.toString(); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } - + @Override public boolean isMapRedTask() { return true; @@ -1244,8 +1253,9 @@ if (m != null) { for (FetchWork fw: m.values()) { String s = fw.getTblDir(); - if ((s != null) && ctx.isMRTmpFileURI(s)) + if ((s != null) && ctx.isMRTmpFileURI(s)) { fw.setTblDir(ctx.localizeMRTmpFileURI(s)); + } } } } @@ -1253,27 +1263,30 @@ // fix up outputs Map> pa = work.getPathToAliases(); if (pa != null) { - for (List ls: pa.values()) + for (List ls: pa.values()) { for (String a: ls) { ArrayList> opList = new ArrayList> (); opList.add(work.getAliasToWork().get(a)); - + while (!opList.isEmpty()) { Operator op = opList.remove(0); if (op instanceof FileSinkOperator) { FileSinkDesc fdesc = ((FileSinkOperator)op).getConf(); String s = fdesc.getDirName(); - if ((s != null) && ctx.isMRTmpFileURI(s)) + if ((s != null) && ctx.isMRTmpFileURI(s)) { fdesc.setDirName(ctx.localizeMRTmpFileURI(s)); + } ((FileSinkOperator)op).setConf(fdesc); } - if (op.getChildOperators() != null) + if (op.getChildOperators() != null) { opList.addAll(op.getChildOperators()); + } } } + } } } }