Index: ql/src/test/results/clientnegative/udf_test_error.q.out =================================================================== --- ql/src/test/results/clientnegative/udf_test_error.q.out (revision 0) +++ ql/src/test/results/clientnegative/udf_test_error.q.out (revision 0) @@ -0,0 +1,9 @@ +PREHOOK: query: CREATE TEMPORARY FUNCTION test_error AS 'org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse' +PREHOOK: type: CREATEFUNCTION +POSTHOOK: query: CREATE TEMPORARY FUNCTION test_error AS 'org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse' +POSTHOOK: type: CREATEFUNCTION +PREHOOK: query: SELECT test_error(key < 125 OR key > 130) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk/build/ql/scratchdir/hive_2010-03-06_00-58-40_004_2624763517220611615/10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Index: ql/src/test/results/clientnegative/udf_test_error_reduce.q.out =================================================================== --- ql/src/test/results/clientnegative/udf_test_error_reduce.q.out (revision 0) +++ ql/src/test/results/clientnegative/udf_test_error_reduce.q.out (revision 0) @@ -0,0 +1,14 @@ +PREHOOK: query: CREATE TEMPORARY FUNCTION test_error AS 'org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse' +PREHOOK: type: CREATEFUNCTION +POSTHOOK: query: CREATE TEMPORARY FUNCTION test_error AS 'org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse' +POSTHOOK: type: CREATEFUNCTION +PREHOOK: query: SELECT test_error(key < 125 OR key > 130) +FROM ( + SELECT * + FROM src + DISTRIBUTE BY rand() +) map_output +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk/build/ql/scratchdir/hive_2010-03-05_23-12-16_809_4809554819212794550/10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Index: ql/src/test/org/apache/hadoop/hive/ql/udf/UDFTestErrorOnFalse.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/udf/UDFTestErrorOnFalse.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/udf/UDFTestErrorOnFalse.java (revision 0) @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf; + +import org.apache.hadoop.hive.ql.exec.UDF; + +/** + * A UDF for testing, which throws RuntimeException if the length of a string. + */ +public class UDFTestErrorOnFalse extends UDF { + + public int evaluate(Boolean b) { + if (b) { + return 1; + } else { + throw new RuntimeException("UDFTestErrorOnFalse got b=false"); + } + } +} Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 920542) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -251,6 +251,7 @@ deleteDirectory(new File(warehousePath, s)); } FunctionRegistry.unregisterTemporaryUDF("test_udaf"); + FunctionRegistry.unregisterTemporaryUDF("test_error"); } private void runLoadCmd(String loadCmd) throws Exception { Index: ql/src/test/queries/clientnegative/udf_test_error_reduce.q =================================================================== --- ql/src/test/queries/clientnegative/udf_test_error_reduce.q (revision 0) +++ ql/src/test/queries/clientnegative/udf_test_error_reduce.q (revision 0) @@ -0,0 +1,11 @@ +CREATE TEMPORARY FUNCTION test_error AS 'org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse'; + + +SELECT test_error(key < 125 OR key > 130) +FROM ( + SELECT * + FROM src + DISTRIBUTE BY rand() +) map_output; + + Index: ql/src/test/queries/clientnegative/udf_test_error.q =================================================================== --- ql/src/test/queries/clientnegative/udf_test_error.q (revision 0) +++ ql/src/test/queries/clientnegative/udf_test_error.q (revision 0) @@ -0,0 +1,3 @@ +CREATE TEMPORARY FUNCTION test_error AS 'org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse'; + +SELECT test_error(key < 125 OR key > 130) FROM src; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (revision 920542) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -43,6 +44,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; /** * Map operator. This triggers overall map side processing. This is a little @@ -371,19 +373,49 @@ } public void process(Writable value) throws HiveException { + Object row = null; try { if (!isPartitioned) { - Object row = deserializer.deserialize(value); - forward(row, rowObjectInspector); + row = deserializer.deserialize(value); } else { rowWithPart[0] = deserializer.deserialize(value); - forward(rowWithPart, rowObjectInspector); } - } catch (SerDeException e) { + } catch (Exception e) { + // Serialize the row and output. + String rawRowString; + try { + rawRowString = value.toString(); + } catch (Exception e2) { + rawRowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + // TODO: policy on deserialization errors deserialize_error_count.set(deserialize_error_count.get() + 1); - throw new HiveException(e); + throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e); } + + try { + if (!isPartitioned) { + forward(row, rowObjectInspector); + } else { + forward(rowWithPart, rowObjectInspector); + } + } catch (Exception e) { + // Serialize the row and output the error message. + String rowString; + try { + if (!isPartitioned) { + rowString = SerDeUtils.getJSONString(row, rowObjectInspector); + } else { + rowString = SerDeUtils.getJSONString(rowWithPart, rowObjectInspector); + } + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing row " + rowString, e); + } } @Override Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 920542) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -46,6 +46,7 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * ExecMapper. @@ -198,12 +199,12 @@ } } catch (Throwable e) { abort = true; - e.printStackTrace(); if (e instanceof OutOfMemoryError) { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else { - throw new RuntimeException(e.getMessage(), e); + l4j.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); } } } @@ -277,7 +278,7 @@ // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else { - throw new RuntimeException("Map local work failed", e); + throw new RuntimeException("Hive Runtime Error: Map local work failed", e); } } } @@ -342,7 +343,7 @@ if (!abort) { // signal new failure to map-reduce l4j.error("Hit error while closing operators - failing tree"); - throw new RuntimeException("Error while closing operators", e); + throw new RuntimeException("Hive Runtime Error while closing operators", e); } } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (revision 920542) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -46,6 +47,7 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * ExecReducer. @@ -84,9 +86,11 @@ TableDesc keyTableDesc; TableDesc[] valueTableDesc; + ObjectInspector[] rowObjectInspector; + @Override public void configure(JobConf job) { - ObjectInspector[] rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; @@ -195,7 +199,7 @@ keyObject = inputKeyDeserializer.deserialize(keyWritable); } catch (Exception e) { throw new HiveException( - "Unable to deserialize reduce input key from " + "Hive Runtime Error: Unable to deserialize reduce input key from " + Utilities.formatBinaryString(keyWritable.get(), 0, keyWritable.getSize()) + " with properties " + keyTableDesc.getProperties(), e); @@ -215,7 +219,7 @@ .deserialize(valueWritable); } catch (SerDeException e) { throw new HiveException( - "Unable to deserialize reduce input value (tag=" + "Hive Runtime Error: Unable to deserialize reduce input value (tag=" + tag.get() + ") from " + Utilities.formatBinaryString(valueWritable.get(), 0, @@ -236,7 +240,19 @@ nextCntr = getNextCntr(cntr); } } - reducer.process(row, tag.get()); + try { + reducer.process(row, tag.get()); + } catch (Exception e) { + String rowString = null; + try { + rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag.get()]); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing row (tag=" + + tag.get() + ") " + rowString, e); + } } } catch (Throwable e) { @@ -245,7 +261,8 @@ // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else { - throw new IOException(e); + l4j.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); } } } @@ -283,12 +300,12 @@ reducer.close(abort); reportStats rps = new reportStats(rp); reducer.preorderMap(rps); - return; + } catch (Exception e) { if (!abort) { // signal new failure to map-reduce l4j.error("Hit error while closing operators - failing tree"); - throw new RuntimeException("Error while closing operators: " + throw new RuntimeException("Hive Runtime Error while closing operators: " + e.getMessage(), e); } }