Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1872) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -61,6 +61,7 @@ SCRATCHDIR("hive.exec.scratchdir", "/tmp/"+System.getProperty("user.name")+"/hive"), SUBMITVIACHILD("hive.exec.submitviachild", false), SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000), + IGNOREPIPEBROKEN("hive.exec.script.ignore.pipe.broken.exception", false), COMPRESSRESULT("hive.exec.compress.output", false), COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false), COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""), @@ -69,7 +70,7 @@ MAXREDUCERS("hive.exec.reducers.max", 999), PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), - + // hadoop stuff HADOOPBIN("hadoop.bin.path", System.getenv("HADOOP_HOME") + "/bin/hadoop"), HADOOPCONF("hadoop.config.dir", System.getenv("HADOOP_HOME") + "/conf"), Index: ql/src/test/results/clientnegative/script_broken_pipe2.q.out =================================================================== --- ql/src/test/results/clientnegative/script_broken_pipe2.q.out (revision 0) +++ ql/src/test/results/clientnegative/script_broken_pipe2.q.out (revision 0) @@ -0,0 +1,6 @@ +PREHOOK: query: -- Tests exception in ScriptOperator.processOp() +SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1277151208/10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Index: ql/src/test/results/clientnegative/script_broken_pipe1.q.out =================================================================== --- ql/src/test/results/clientnegative/script_broken_pipe1.q.out (revision 0) +++ ql/src/test/results/clientnegative/script_broken_pipe1.q.out (revision 0) @@ -0,0 +1,6 @@ +PREHOOK: query: -- Tests exception in ScriptOperator.close() +SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM src LIMIT 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/376655155/10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Index: ql/src/test/results/clientpositive/script_pipe.q.out =================================================================== --- ql/src/test/results/clientpositive/script_pipe.q.out (revision 0) +++ ql/src/test/results/clientpositive/script_pipe.q.out (revision 0) @@ -0,0 +1,124 @@ +PREHOOK: query: -- Tests exception in ScriptOperator.close() +EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM src LIMIT 1 +PREHOOK: type: QUERY +POSTHOOK: query: -- Tests exception in ScriptOperator.close() +EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM src LIMIT 1 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST TOK_ALLCOLREF) TOK_SERDE TOK_RECORDWRITER 'true' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c)))) (TOK_LIMIT 1))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Transform Operator + command: true + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 1 + + +PREHOOK: query: EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value)) TOK_SERDE TOK_RECORDWRITER 'head -n 1' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c d)))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11 + Transform Operator + command: head -n 1 + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM src LIMIT 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/2002442720/10000 +POSTHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM src LIMIT 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/2002442720/10000 +PREHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/871946213/10000 +POSTHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/871946213/10000 +238 val_238 238 val_238 Index: ql/src/test/queries/clientnegative/script_broken_pipe1.q =================================================================== --- ql/src/test/queries/clientnegative/script_broken_pipe1.q (revision 0) +++ ql/src/test/queries/clientnegative/script_broken_pipe1.q (revision 0) @@ -0,0 +1,2 @@ +-- Tests exception in ScriptOperator.close() +SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM src LIMIT 1; Index: ql/src/test/queries/clientnegative/script_broken_pipe2.q =================================================================== --- ql/src/test/queries/clientnegative/script_broken_pipe2.q (revision 0) +++ ql/src/test/queries/clientnegative/script_broken_pipe2.q (revision 0) @@ -0,0 +1,2 @@ +-- Tests exception in ScriptOperator.processOp() +SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src; \ No newline at end of file Index: ql/src/test/queries/clientpositive/script_pipe.q =================================================================== --- ql/src/test/queries/clientpositive/script_pipe.q (revision 0) +++ ql/src/test/queries/clientpositive/script_pipe.q (revision 0) @@ -0,0 +1,9 @@ +set hive.exec.script.ignore.pipe.broken.exception = true; + +-- Tests exception in ScriptOperator.close() +EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM src LIMIT 1; +-- Tests exception in ScriptOperator.processOp() +EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src; + +SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM src LIMIT 1; +SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src; \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (revision 1872) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (working copy) @@ -22,6 +22,7 @@ import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -70,6 +71,8 @@ transient volatile Throwable scriptError = null; transient RecordWriter scriptOutWriter; + static final String IO_EXCEPTION_BROKEN_PIPE_STRING= "Broken pipe"; + /** * Timer to send periodic reports back to the tracker. */ @@ -270,6 +273,12 @@ } } + private boolean isIgnorableBrokenPipeException(IOException e) { + + return HiveConf.getBoolVar(hconf, HiveConf.ConfVars.IGNOREPIPEBROKEN) && + (e.getMessage().compareToIgnoreCase(IO_EXCEPTION_BROKEN_PIPE_STRING) == 0); + } + public void processOp(Object row, int tag) throws HiveException { if(scriptError != null) { @@ -285,9 +294,14 @@ serialize_error_count.set(serialize_error_count.get() + 1); throw new HiveException(e); } catch (IOException e) { - LOG.error("Error in writing to script: " + e.getMessage()); - scriptError = e; - throw new HiveException(e); + if(isIgnorableBrokenPipeException(e)) { + setDone(true); + LOG.error("Got broken pipe: ignoring exception and setting operator to done"); + } else { + LOG.error("Error in writing to script: " + e.getMessage()); + scriptError = e; + throw new HiveException(e); + } } } @@ -307,9 +321,13 @@ new_abort = true; }; } catch (IOException e) { - LOG.error("Got ioexception: " + e.getMessage()); - e.printStackTrace(); - new_abort = true; + if(isIgnorableBrokenPipeException(e)) { + LOG.error("Got broken pipe: ignoring"); + } else { + LOG.error("Got ioexception: " + e.getMessage()); + e.printStackTrace(); + new_abort = true; + } } catch (InterruptedException e) { } } else {