Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1872)
+++ conf/hive-default.xml (working copy)
@@ -280,6 +280,13 @@
+ hive.exec.script.allow.partial.consumption
+ false
+ When enabled, this option allows a user script to exit successfully without consuming all the data from the standard input.
+
+
+
+
hive.exec.compress.output
false
This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress*
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1872)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -61,6 +61,7 @@
SCRATCHDIR("hive.exec.scratchdir", "/tmp/"+System.getProperty("user.name")+"/hive"),
SUBMITVIACHILD("hive.exec.submitviachild", false),
SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000),
+ ALLOWPARTIALCONSUMP("hive.exec.script.allow.partial.consumption", false),
COMPRESSRESULT("hive.exec.compress.output", false),
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
Index: ql/src/test/results/clientnegative/script_broken_pipe2.q.out
===================================================================
--- ql/src/test/results/clientnegative/script_broken_pipe2.q.out (revision 0)
+++ ql/src/test/results/clientnegative/script_broken_pipe2.q.out (revision 0)
@@ -0,0 +1,6 @@
+PREHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/588453066/10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
Index: ql/src/test/results/clientnegative/script_broken_pipe1.q.out
===================================================================
--- ql/src/test/results/clientnegative/script_broken_pipe1.q.out (revision 0)
+++ ql/src/test/results/clientnegative/script_broken_pipe1.q.out (revision 0)
@@ -0,0 +1,6 @@
+PREHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1832401066/10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
Index: ql/src/test/results/clientnegative/script_broken_pipe3.q.out
===================================================================
--- ql/src/test/results/clientnegative/script_broken_pipe3.q.out (revision 0)
+++ ql/src/test/results/clientnegative/script_broken_pipe3.q.out (revision 0)
@@ -0,0 +1,6 @@
+PREHOOK: query: -- Test to ensure that a script with a bad error code still fails even with partial consumption
+SELECT TRANSFORM(*) USING 'false' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1937270363/10000
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
Index: ql/src/test/results/clientpositive/script_pipe.q.out
===================================================================
--- ql/src/test/results/clientpositive/script_pipe.q.out (revision 0)
+++ ql/src/test/results/clientpositive/script_pipe.q.out (revision 0)
@@ -0,0 +1,144 @@
+PREHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 1))) tmp)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST TOK_ALLCOLREF) TOK_SERDE TOK_RECORDWRITER 'true' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c))))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ tmp:src
+ TableScan
+ alias: src
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Limit
+ Reduce Output Operator
+ sort order:
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ Reduce Operator Tree:
+ Extract
+ Limit
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ Transform Operator
+ command: true
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value) (TOK_TABLE_OR_COL key) (TOK_TABLE_OR_COL value)) TOK_SERDE TOK_RECORDWRITER 'head -n 1' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST a b c d))))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11
+ Transform Operator
+ command: head -n 1
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1755225837/10000
+POSTHOOK: query: SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/1755225837/10000
+PREHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/501105556/10000
+POSTHOOK: query: SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/data/users/pyang/trunk/VENDOR.hive/trunk/build/ql/tmp/501105556/10000
+238 val_238 238 val_238
Index: ql/src/test/queries/clientnegative/script_broken_pipe1.q
===================================================================
--- ql/src/test/queries/clientnegative/script_broken_pipe1.q (revision 0)
+++ ql/src/test/queries/clientnegative/script_broken_pipe1.q (revision 0)
@@ -0,0 +1,2 @@
+-- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
Index: ql/src/test/queries/clientnegative/script_broken_pipe3.q
===================================================================
--- ql/src/test/queries/clientnegative/script_broken_pipe3.q (revision 0)
+++ ql/src/test/queries/clientnegative/script_broken_pipe3.q (revision 0)
@@ -0,0 +1,3 @@
+set hive.exec.script.allow.partial.consumption = true;
+-- Test to ensure that a script with a bad error code still fails even with partial consumption
+SELECT TRANSFORM(*) USING 'false' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
Index: ql/src/test/queries/clientnegative/script_broken_pipe2.q
===================================================================
--- ql/src/test/queries/clientnegative/script_broken_pipe2.q (revision 0)
+++ ql/src/test/queries/clientnegative/script_broken_pipe2.q (revision 0)
@@ -0,0 +1,2 @@
+-- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src;
Index: ql/src/test/queries/clientpositive/script_pipe.q
===================================================================
--- ql/src/test/queries/clientpositive/script_pipe.q (revision 0)
+++ ql/src/test/queries/clientpositive/script_pipe.q (revision 0)
@@ -0,0 +1,8 @@
+set hive.exec.script.allow.partial.consumption = true;
+-- Tests exception in ScriptOperator.close() by passing to the operator a small amount of data
+EXPLAIN SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
+-- Tests exception in ScriptOperator.processOp() by passing extra data needed to fill pipe buffer
+EXPLAIN SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src;
+
+SELECT TRANSFORM(*) USING 'true' AS a, b, c FROM (SELECT * FROM src LIMIT 1) tmp;
+SELECT TRANSFORM(key, value, key, value, key, value, key, value, key, value, key, value) USING 'head -n 1' as a,b,c,d FROM src;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (revision 1872)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (working copy)
@@ -70,6 +70,8 @@
transient volatile Throwable scriptError = null;
transient RecordWriter scriptOutWriter;
+ static final String IO_EXCEPTION_BROKEN_PIPE_STRING= "Broken pipe";
+
/**
* Timer to send periodic reports back to the tracker.
*/
@@ -270,6 +272,20 @@
}
}
+ boolean isBrokenPipeException(IOException e) {
+ return (e.getMessage().compareToIgnoreCase(IO_EXCEPTION_BROKEN_PIPE_STRING) == 0);
+ }
+
+ boolean allowPartialConsumption() {
+ return HiveConf.getBoolVar(hconf, HiveConf.ConfVars.ALLOWPARTIALCONSUMP);
+ }
+
+ void displayBrokenPipeInfo() {
+ LOG.info("The script did not consume all input data. This is considered as an error.");
+ LOG.info("set " + HiveConf.ConfVars.ALLOWPARTIALCONSUMP.toString() + "=true; to ignore it.");
+ return;
+ }
+
public void processOp(Object row, int tag) throws HiveException {
if(scriptError != null) {
@@ -285,9 +301,17 @@
serialize_error_count.set(serialize_error_count.get() + 1);
throw new HiveException(e);
} catch (IOException e) {
- LOG.error("Error in writing to script: " + e.getMessage());
- scriptError = e;
- throw new HiveException(e);
+ if(isBrokenPipeException(e) && allowPartialConsumption()) {
+ setDone(true);
+ LOG.warn("Got broken pipe during write: ignoring exception and setting operator to done");
+ } else {
+ LOG.error("Error in writing to script: " + e.getMessage());
+ if(isBrokenPipeException(e)) {
+ displayBrokenPipeInfo();
+ }
+ scriptError = e;
+ throw new HiveException(e);
+ }
}
}
@@ -300,7 +324,18 @@
}
// everything ok. try normal shutdown
try {
- scriptOutWriter.close();
+ try {
+ scriptOutWriter.close();
+ } catch (IOException e) {
+ if(isBrokenPipeException(e) && allowPartialConsumption()) {
+ LOG.warn("Got broken pipe: ignoring exception");
+ } else {
+ if(isBrokenPipeException(e)) {
+ displayBrokenPipeInfo();
+ }
+ throw e;
+ }
+ }
int exitVal = scriptPid.waitFor();
if (exitVal != 0) {
LOG.error("Script failed with code " + exitVal);