Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1872) +++ conf/hive-default.xml (working copy) @@ -280,6 +280,13 @@ + hive.exec.script.allow.partial.consumption + false + When enabled, this option allows a user script to exit successfully without consuming all the data from the standard input. + + + + hive.exec.compress.output false This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1872) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -61,6 +61,7 @@ SCRATCHDIR("hive.exec.scratchdir", "/tmp/"+System.getProperty("user.name")+"/hive"), SUBMITVIACHILD("hive.exec.submitviachild", false), SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000), + ALLOWPARTIALCONSUMP("hive.exec.script.allow.partial.consumption", false), COMPRESSRESULT("hive.exec.compress.output", false), COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false), COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""), Index: ql/src/test/results/clientnegative/script_broken_pipe2.q.out =================================================================== --- ql/src/test/results/clientnegative/script_broken_pipe2.q.out (revision 0) +++ ql/src/test/results/clientnegative/script_broken_pipe2.q.out (revision 0) @@ -0,0 +1,6 @@ +PREHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer +SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/588453066/10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Index: ql/src/test/results/clientnegative/script_broken_pipe1.q.out =================================================================== --- ql/src/test/results/clientnegative/script_broken_pipe1.q.out (revision 0) +++ ql/src/test/results/clientnegative/script_broken_pipe1.q.out (revision 0) @@ -0,0 +1,6 @@ +PREHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data +SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1832401066/10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Index: ql/src/test/results/clientnegative/script_broken_pipe3.q.out =================================================================== --- ql/src/test/results/clientnegative/script_broken_pipe3.q.out (revision 0) +++ ql/src/test/results/clientnegative/script_broken_pipe3.q.out (revision 0) @@ -0,0 +1,6 @@ +PREHOOK: query: -- Test to ensure that a script with a bad error code still fails even with partial consumption +SELECT TRANSFORM(*) USING 'false' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1937270363/10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Index: ql/src/test/results/clientpositive/script_pipe.q.out =================================================================== --- ql/src/test/results/clientpositive/script_pipe.q.out (revision 0) +++ ql/src/test/results/clientpositive/script_pipe.q.out (revision 0) @@ -0,0 +1,144 @@ +PREHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data +EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp +PREHOOK: type: QUERY +POSTHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data +EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 1))) tmp)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST TOK_ALLCOLREF) TOK_SERDE TOK_RECORDWRITER 'true' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c)))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + tmp:src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Limit + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Extract + Limit + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + Transform Operator + command: true + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer +EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +PREHOOK: type: QUERY +POSTHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer +EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value)) TOK_SERDE TOK_RECORDWRITER 'head -n 1' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c d)))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11 + Transform Operator + command: head -n 1 + output info: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1755225837/10000 +POSTHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1755225837/10000 +PREHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/501105556/10000 +POSTHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/501105556/10000 +238 val_238 238 val_238 Index: ql/src/test/queries/clientnegative/script_broken_pipe1.q =================================================================== --- ql/src/test/queries/clientnegative/script_broken_pipe1.q (revision 0) +++ ql/src/test/queries/clientnegative/script_broken_pipe1.q (revision 0) @@ -0,0 +1,2 @@ +-- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data +SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp; Index: ql/src/test/queries/clientnegative/script_broken_pipe3.q =================================================================== --- ql/src/test/queries/clientnegative/script_broken_pipe3.q (revision 0) +++ ql/src/test/queries/clientnegative/script_broken_pipe3.q (revision 0) @@ -0,0 +1,3 @@ +set hive.exec.script.allow.partial.consumption = true; +-- Test to ensure that a script with a bad error code still fails even with partial consumption +SELECT TRANSFORM(*) USING 'false' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp; Index: ql/src/test/queries/clientnegative/script_broken_pipe2.q =================================================================== --- ql/src/test/queries/clientnegative/script_broken_pipe2.q (revision 0) +++ ql/src/test/queries/clientnegative/script_broken_pipe2.q (revision 0) @@ -0,0 +1,2 @@ +-- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer +SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src; Index: ql/src/test/queries/clientpositive/script_pipe.q =================================================================== --- ql/src/test/queries/clientpositive/script_pipe.q (revision 0) +++ ql/src/test/queries/clientpositive/script_pipe.q (revision 0) @@ -0,0 +1,8 @@ +set hive.exec.script.allow.partial.consumption = true; +-- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data +EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp; +-- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer +EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src; + +SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp; +SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (revision 1872) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (working copy) @@ -70,6 +70,8 @@ transient volatile Throwable scriptError = null; transient RecordWriter scriptOutWriter; + static final String IO_EXCEPTION_BROKEN_PIPE_STRING= "Broken pipe"; + /** * Timer to send periodic reports back to the tracker. */ @@ -270,6 +272,20 @@ } } + boolean isBrokenPipeException(IOException e) { + return (e.getMessage().compareToIgnoreCase(IO_EXCEPTION_BROKEN_PIPE_STRING) == 0); + } + + boolean allowPartialConsumption() { + return HiveConf.getBoolVar(hconf, HiveConf.ConfVars.ALLOWPARTIALCONSUMP); + } + + void displayBrokenPipeInfo() { + LOG.info("The script did not consume all input data. This is considered as an error."); + LOG.info("set " + HiveConf.ConfVars.ALLOWPARTIALCONSUMP.toString() + "=true; to ignore it."); + return; + } + public void processOp(Object row, int tag) throws HiveException { if(scriptError != null) { @@ -285,9 +301,17 @@ serialize_error_count.set(serialize_error_count.get() + 1); throw new HiveException(e); } catch (IOException e) { - LOG.error("Error in writing to script: " + e.getMessage()); - scriptError = e; - throw new HiveException(e); + if(isBrokenPipeException(e) && allowPartialConsumption()) { + setDone(true); + LOG.warn("Got broken pipe during write: ignoring exception and setting operator to done"); + } else { + LOG.error("Error in writing to script: " + e.getMessage()); + if(isBrokenPipeException(e)) { + displayBrokenPipeInfo(); + } + scriptError = e; + throw new HiveException(e); + } } } @@ -300,7 +324,18 @@ } // everything ok. try normal shutdown try { - scriptOutWriter.close(); + try { + scriptOutWriter.close(); + } catch (IOException e) { + if(isBrokenPipeException(e) && allowPartialConsumption()) { + LOG.warn("Got broken pipe: ignoring exception"); + } else { + if(isBrokenPipeException(e)) { + displayBrokenPipeInfo(); + } + throw e; + } + } int exitVal = scriptPid.waitFor(); if (exitVal != 0) { LOG.error("Script failed with code " + exitVal);