Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 982092)
+++ conf/hive-default.xml (working copy)
@@ -588,6 +588,13 @@
+ hive.exec.max.created.files
+ 100000
+ Maximum number of HDFS files created by all mappers/reducers in a MapReduce job.
+
+
+
+
hive.default.partition.name
__HIVE_DEFAULT_PARTITION__
The default partition name in case the dynamic partition column value is null/empty string or anyother values that cannot be escaped. This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). The user has to be aware that the dynamic partition value should not contain this value to avoid confusions.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 982092)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -91,6 +91,7 @@
DYNAMICPARTITIONINGMODE("hive.exec.dynamic.partition.mode", "strict"),
DYNAMICPARTITIONMAXPARTS("hive.exec.max.dynamic.partitions", 1000),
DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100),
+ MAXCREATEDFILES("hive.exec.max.created.files", 100000),
DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"),
Index: ql/src/test/results/clientnegative/dyn_part3.q.out
===================================================================
--- ql/src/test/results/clientnegative/dyn_part3.q.out (revision 0)
+++ ql/src/test/results/clientnegative/dyn_part3.q.out (revision 0)
@@ -0,0 +1,9 @@
+PREHOOK: query: create table nzhang_part( key string) partitioned by (value string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table nzhang_part( key string) partitioned by (value string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@nzhang_part
+PREHOOK: query: insert overwrite table nzhang_part partition(value) select key, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
Index: ql/src/test/queries/clientnegative/dyn_part3.q
===================================================================
--- ql/src/test/queries/clientnegative/dyn_part3.q (revision 0)
+++ ql/src/test/queries/clientnegative/dyn_part3.q (revision 0)
@@ -0,0 +1,9 @@
+set hive.exec.max.dynamic.partitions=600;
+set hive.exec.max.dynamic.partitions.pernode=600;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.exec.dynamic.partition=true;
+set hive.exec.max.created.files=100;
+
+create table nzhang_part( key string) partitioned by (value string);
+
+insert overwrite table nzhang_part partition(value) select key, value from src;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (revision 982092)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (working copy)
@@ -25,8 +25,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
-import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -185,7 +185,7 @@
* Initializes this map op as the root of the tree. It sets JobConf &
* MapRedWork and starts initialization of the operator tree rooted at this
* op.
- *
+ *
* @param hconf
* @param mrwork
* @throws HiveException
@@ -292,7 +292,7 @@
Operator extends Serializable> op = conf.getAliasToWork().get(
onealias);
LOG.info("Adding alias " + onealias + " to work list for file "
- + fpath.toUri().getPath());
+ + onefile);
MapInputPath inp = new MapInputPath(onefile, onealias, op);
opCtxMap.put(inp, opCtx);
if (operatorToPaths.get(op) == null) {
@@ -449,15 +449,15 @@
try {
rawRowString = value.toString();
} catch (Exception e2) {
- rawRowString = "[Error getting row data with exception " +
+ rawRowString = "[Error getting row data with exception " +
StringUtils.stringifyException(e2) + " ]";
}
-
+
// TODO: policy on deserialization errors
deserialize_error_count.set(deserialize_error_count.get() + 1);
throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e);
}
-
+
try {
if (this.hasVC) {
forward(this.rowWithPartAndVC, this.rowObjectInspector);
@@ -478,7 +478,7 @@
rowString = SerDeUtils.getJSONString(rowWithPart, rowObjectInspector);
}
} catch (Exception e2) {
- rowString = "[Error getting row data with exception " +
+ rowString = "[Error getting row data with exception " +
StringUtils.stringifyException(e2) + " ]";
}
throw new HiveException("Hive Runtime Error while processing row " + rowString, e);
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 982092)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy)
@@ -49,10 +49,10 @@
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -463,6 +463,10 @@
// buckets of dynamic partitions will be created for each newly created partition
fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx]);
+ // increment the CREATED_FILES counter
+ if (reporter != null) {
+ reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
+ }
filesIdx++;
}
assert filesIdx == numFiles;
@@ -517,9 +521,7 @@
}
try {
- if (reporter != null) {
- reporter.progress();
- }
+ updateProgress();
// if DP is enabled, get the final output writers and prepare the real output row
assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT:
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 982092)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy)
@@ -29,7 +29,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.ExecMapperContext;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.Explain;
@@ -845,6 +844,7 @@
* TODO This is a hack for hadoop 0.17 which only supports enum counters.
*/
public static enum ProgressCounter {
+ CREATED_FILES,
C1, C2, C3, C4, C5, C6, C7, C8, C9, C10,
C11, C12, C13, C14, C15, C16, C17, C18, C19, C20,
C21, C22, C23, C24, C25, C26, C27, C28, C29, C30,
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 982092)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -19,14 +19,12 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
-import java.net.URI;
+import java.net.URL;
import java.net.URLDecoder;
-import java.net.URL;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -38,22 +36,21 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
@@ -61,9 +58,9 @@
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -77,14 +74,13 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.varia.NullAppender;
import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.varia.NullAppender;
/**
* ExecDriver.
@@ -293,6 +289,13 @@
// we may still be able to retrieve the job status - so ignore
return false;
}
+ // check for number of created files
+ long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
+ long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
+ if (numFiles > upperLimit) {
+ errMsg.append("total number of created files exceeds ").append(upperLimit);
+ return true;
+ }
for (Operator extends Serializable> op : work.getAliasToWork().values()) {
if (op.checkFatalErrors(ctrs, errMsg)) {
@@ -640,8 +643,9 @@
} finally {
Utilities.clearMapRedWork(job);
try {
- if(ctxCreated)
+ if(ctxCreated) {
ctx.clear();
+ }
if (rj != null) {
if (returnVal != 0) {
@@ -895,9 +899,10 @@
private static void setupChildLog4j(Configuration conf) {
URL hive_l4j = ExecDriver.class.getClassLoader().getResource
(SessionState.HIVE_EXEC_L4J);
- if(hive_l4j == null)
+ if(hive_l4j == null) {
hive_l4j = ExecDriver.class.getClassLoader().getResource
(SessionState.HIVE_L4J);
+ }
if (hive_l4j != null) {
// setting queryid so that log4j configuration can use it to generate
@@ -1072,13 +1077,13 @@
sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/"
+ Utilities.randGen.nextInt(), "UTF-8"));
}
-
+
return sb.toString();
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
-
+
@Override
public boolean isMapRedTask() {
return true;
@@ -1244,8 +1249,9 @@
if (m != null) {
for (FetchWork fw: m.values()) {
String s = fw.getTblDir();
- if ((s != null) && ctx.isMRTmpFileURI(s))
+ if ((s != null) && ctx.isMRTmpFileURI(s)) {
fw.setTblDir(ctx.localizeMRTmpFileURI(s));
+ }
}
}
}
@@ -1253,27 +1259,30 @@
// fix up outputs
Map> pa = work.getPathToAliases();
if (pa != null) {
- for (List ls: pa.values())
+ for (List ls: pa.values()) {
for (String a: ls) {
ArrayList> opList = new
ArrayList> ();
opList.add(work.getAliasToWork().get(a));
-
+
while (!opList.isEmpty()) {
Operator extends Serializable> op = opList.remove(0);
if (op instanceof FileSinkOperator) {
FileSinkDesc fdesc = ((FileSinkOperator)op).getConf();
String s = fdesc.getDirName();
- if ((s != null) && ctx.isMRTmpFileURI(s))
+ if ((s != null) && ctx.isMRTmpFileURI(s)) {
fdesc.setDirName(ctx.localizeMRTmpFileURI(s));
+ }
((FileSinkOperator)op).setConf(fdesc);
}
- if (op.getChildOperators() != null)
+ if (op.getChildOperators() != null) {
opList.addAll(op.getChildOperators());
+ }
}
}
+ }
}
}
}