diff --git build-common.xml build-common.xml index 5ffd960..4f4afd7 100644 --- build-common.xml +++ build-common.xml @@ -59,7 +59,7 @@ - + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java index 1bb96db..2bb88cc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec; import java.io.IOException; import java.io.Serializable; +import java.lang.Exception; +import java.net.MalformedURLException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; @@ -106,10 +108,6 @@ public class HadoopJobExecHelper { return "Ended Job = " + jobId; } - private String getTaskAttemptLogUrl(String taskTrackerHttpAddress, String taskAttemptId) { - return taskTrackerHttpAddress + "/tasklog?taskid=" + taskAttemptId + "&all=true"; - } - public boolean mapStarted() { return mapProgress > 0; } @@ -495,7 +493,8 @@ public class HadoopJobExecHelper { } @SuppressWarnings("deprecation") - private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException { + private void showJobFailDebugInfo(JobConf conf, RunningJob rj) + throws IOException, MalformedURLException { // Mapping from task ID to the number of failures Map failures = new HashMap(); // Successful task ID's @@ -544,7 +543,11 @@ public class HadoopJobExecHelper { } // These tasks should have come from the same job. assert (ti.getJobId() != null && ti.getJobId().equals(jobId)); - ti.getLogUrls().add(getTaskAttemptLogUrl(t.getTaskTrackerHttp(), t.getTaskId())); + String taskAttemptLogUrl = ShimLoader.getHadoopShims().getTaskAttemptLogUrl( + conf, t.getTaskTrackerHttp(), t.getTaskId()); + if (taskAttemptLogUrl != null) { + ti.getLogUrls().add(taskAttemptLogUrl); + } // If a task failed, then keep track of the total number of failures // for that task (typically, a task gets re-run up to 4 times if it diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JobDebugger.java ql/src/java/org/apache/hadoop/hive/ql/exec/JobDebugger.java index a25e5f9..9f9971d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JobDebugger.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JobDebugger.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.exec; import java.io.IOException; +import java.lang.Exception; +import java.net.MalformedURLException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -118,9 +120,6 @@ public class JobDebugger implements Runnable { console.printError(e.getMessage()); } } - private String getTaskAttemptLogUrl(String taskTrackerHttpAddress, String taskAttemptId) { - return taskTrackerHttpAddress + "/tasklog?taskid=" + taskAttemptId + "&start=-8193"; - } public static int extractErrorCode(String[] diagnostics) { int result = 0; @@ -141,12 +140,12 @@ public class JobDebugger implements Runnable { public void run() { try { getTaskInfos(); - } catch (IOException e) { + } catch (Exception e) { console.printError(e.getMessage()); } } - private void getTaskInfos() throws IOException { + private void getTaskInfos() throws IOException, MalformedURLException { int startIndex = 0; while (true) { TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(startIndex); @@ -184,7 +183,11 @@ public class JobDebugger implements Runnable { } // These tasks should have come from the same job. assert (ti.getJobId() != null && ti.getJobId().equals(jobId)); - ti.getLogUrls().add(getTaskAttemptLogUrl(t.getTaskTrackerHttp(), t.getTaskId())); + String taskAttemptLogUrl = ShimLoader.getHadoopShims().getTaskAttemptLogUrl( + conf, t.getTaskTrackerHttp(), t.getTaskId()); + if (taskAttemptLogUrl != null) { + ti.getLogUrls().add(taskAttemptLogUrl); + } // If a task failed, fetch its error code (if available). // Also keep track of the total number of failures for that diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/errors/TaskLogProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/errors/TaskLogProcessor.java index c5862d0..0917c43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/errors/TaskLogProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/errors/TaskLogProcessor.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.regex.Pattern; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; @@ -40,6 +42,7 @@ import org.apache.hadoop.mapred.JobConf; */ public class TaskLogProcessor { + private final Log LOG = LogFactory.getLog(TaskLogProcessor.class); private final Map heuristics = new HashMap(); private final List taskLogUrls = new ArrayList(); @@ -107,15 +110,16 @@ public class TaskLogProcessor { */ public List getErrors() { - for(String urlString : taskLogUrls) { + for (String urlString : taskLogUrls) { // Open the log file, and read in a line. Then feed the line into // each of the ErrorHeuristics. Repeat for all the lines in the log. URL taskAttemptLogUrl; try { taskAttemptLogUrl = new URL(urlString); - } catch(MalformedURLException e) { - throw new RuntimeException("Bad task log url", e); + } catch (MalformedURLException e) { + LOG.error("Bad task log URL", e); + continue; } BufferedReader in; try { @@ -129,19 +133,20 @@ public class TaskLogProcessor { } in.close(); } catch (IOException e) { - throw new RuntimeException("Error while reading from task log url", e); + LOG.error("Error while reading from task log URL", e); + continue; } // Once the lines of the log file have been fed into the ErrorHeuristics, // see if they have detected anything. If any has, record // what ErrorAndSolution it gave so we can later return the most // frequently occurring error - for(Entry ent : heuristics.entrySet()) { + for (Entry ent : heuristics.entrySet()) { ErrorHeuristic eh = ent.getKey(); HeuristicStats hs = ent.getValue(); ErrorAndSolution es = eh.getErrorAndSolution(); - if(es != null) { + if (es != null) { hs.incTriggerCount(); hs.addErrorAndSolution(es); } @@ -151,16 +156,16 @@ public class TaskLogProcessor { // Return the errors that occur the most frequently int max = 0; - for(HeuristicStats hs : heuristics.values()) { + for (HeuristicStats hs : heuristics.values()) { if(hs.getTriggerCount() > max) { max = hs.getTriggerCount(); } } List errors = new ArrayList(); - for(HeuristicStats hs : heuristics.values()) { - if(hs.getTriggerCount() == max) { - if(hs.getErrorAndSolutions().size() > 0) { + for (HeuristicStats hs : heuristics.values()) { + if (hs.getTriggerCount() == max) { + if (hs.getErrorAndSolutions().size() > 0) { // An error heuristic could have generated different ErrorAndSolution // for each task attempt, but most likely they are the same. Plus, // one of those is probably good enough for debugging diff --git ql/src/test/org/apache/hadoop/hive/ql/udf/generic/GenericUDFEvaluateNPE.java ql/src/test/org/apache/hadoop/hive/ql/udf/generic/GenericUDFEvaluateNPE.java new file mode 100644 index 0000000..a407982 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/udf/generic/GenericUDFEvaluateNPE.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.lang.NullPointerException; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + +/** + * GenericUDFEvaluateNPE + * This UDF is to throw an Null Pointer Exception + * It is used to test hive failure handling + * + */ +@Description(name = "evaluate_npe", value = "_FUNC_(string) - " + + "Throws an NPE in the GenericUDF.evaluate() method. " + + "Used for testing GenericUDF error handling.") +public class GenericUDFEvaluateNPE extends GenericUDF { + private ObjectInspector[] argumentOIs; + private final Text result= new Text(); + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) + throws UDFArgumentException { + if (arguments.length != 1) { + throw new UDFArgumentLengthException( + "The function evaluate_npe(string)" + + "needs only one argument."); + } + + if (!arguments[0].getTypeName().equals(Constants.STRING_TYPE_NAME)) { + throw new UDFArgumentTypeException(0, + "Argument 1 of function evaluate_npe must be \"" + + Constants.STRING_TYPE_NAME + "but \"" + + arguments[0].getTypeName() + "\" was found."); + } + + argumentOIs = arguments; + return PrimitiveObjectInspectorFactory.writableStringObjectInspector; + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + if (true) { + throw new NullPointerException("evaluate null pointer exception"); + } + return result; + } + + @Override + public String getDisplayString(String[] children) { + assert (children.length == 1); + return "evaluate_npe(" + children[0] + ")"; + } +} diff --git ql/src/test/queries/clientnegative/cluster_tasklog_retrieval.q ql/src/test/queries/clientnegative/cluster_tasklog_retrieval.q new file mode 100644 index 0000000..bc98044 --- /dev/null +++ ql/src/test/queries/clientnegative/cluster_tasklog_retrieval.q @@ -0,0 +1,6 @@ +-- TaskLog retrieval upon Null Pointer Exception in Cluster + +CREATE TEMPORARY FUNCTION evaluate_npe AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFEvaluateNPE'; + +FROM src +SELECT evaluate_npe(src.key) LIMIT 1; diff --git ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out new file mode 100644 index 0000000..457980a --- /dev/null +++ ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out @@ -0,0 +1,14 @@ +PREHOOK: query: -- TaskLog retrieval upon Null Pointer Exception in Cluster + +CREATE TEMPORARY FUNCTION evaluate_npe AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFEvaluateNPE' +PREHOOK: type: CREATEFUNCTION +POSTHOOK: query: -- TaskLog retrieval upon Null Pointer Exception in Cluster + +CREATE TEMPORARY FUNCTION evaluate_npe AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFEvaluateNPE' +POSTHOOK: type: CREATEFUNCTION +PREHOOK: query: FROM src +SELECT evaluate_npe(src.key) LIMIT 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask diff --git shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index f7fd76e..a24e183 100644 --- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -21,6 +21,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.Constructor; +import java.net.MalformedURLException; +import java.net.URL; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -51,6 +53,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapred.TaskLogServlet; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileSplit; import org.apache.hadoop.mapreduce.Job; @@ -531,6 +534,17 @@ public class Hadoop20Shims implements HadoopShims { } @Override + public String getTaskAttemptLogUrl(JobConf conf, + String taskTrackerHttpAddress, String taskAttemptId) + throws MalformedURLException { + URL taskTrackerHttpURL = new URL(taskTrackerHttpAddress); + return TaskLogServlet.getTaskLogUrl( + taskTrackerHttpURL.getHost(), + Integer.toString(taskTrackerHttpURL.getPort()), + taskAttemptId); + } + + @Override public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception { JobTrackerState state; switch (clusterStatus.getJobTrackerState()) { diff --git shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index 563b316..12bc4e9 100644 --- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -17,9 +17,14 @@ */ package org.apache.hadoop.hive.shims; +import java.net.MalformedURLException; +import java.net.URL; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskLogServlet; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.Progressable; @@ -30,6 +35,17 @@ import org.apache.hadoop.util.Progressable; public class Hadoop20SShims extends HadoopShimsSecure { @Override + public String getTaskAttemptLogUrl(JobConf conf, + String taskTrackerHttpAddress, String taskAttemptId) + throws MalformedURLException { + URL taskTrackerHttpURL = new URL(taskTrackerHttpAddress); + return TaskLogServlet.getTaskLogUrl( + taskTrackerHttpURL.getHost(), + Integer.toString(taskTrackerHttpURL.getPort()), + taskAttemptId); + } + + @Override public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception { JobTrackerState state; switch (clusterStatus.getJobTrackerState()) { diff --git shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 63b47cb..a85e9c3 100644 --- shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -17,14 +17,20 @@ */ package org.apache.hadoop.hive.shims; +import java.lang.Integer; +import java.net.MalformedURLException; +import java.net.URL; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mapreduce.util.HostUtil; import org.apache.hadoop.util.Progressable; /** @@ -33,6 +39,24 @@ import org.apache.hadoop.util.Progressable; public class Hadoop23Shims extends HadoopShimsSecure { @Override + public String getTaskAttemptLogUrl(JobConf conf, + String taskTrackerHttpAddress, String taskAttemptId) + throws MalformedURLException { + if (conf.get("mapreduce.framework.name") != null + && conf.get("mapreduce.framework.name").equals("yarn")) { + // if the cluster is running in MR2 mode, return null + LOG.warn("Can't fetch tasklog: TaskLogServlet is not supported in MR2 mode."); + return null; + } else { + // if the cluster is running in MR1 mode, using HostUtil to construct TaskLogURL + URL taskTrackerHttpURL = new URL(taskTrackerHttpAddress); + return HostUtil.getTaskLogUrl(taskTrackerHttpURL.getHost(), + Integer.toString(taskTrackerHttpURL.getPort()), + taskAttemptId); + } + } + + @Override public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception { JobTrackerState state; switch (clusterStatus.getJobTrackerStatus()) { diff --git shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java index 7b0c410..ee38781 100644 --- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.shims; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.net.MalformedURLException; import java.security.PrivilegedExceptionAction; import java.util.List; @@ -66,6 +67,17 @@ public interface HadoopShims { boolean usesJobShell(); /** + * Constructs and Returns TaskAttempt Log Url + * or null if the TaskLogServlet is not available + * + * @return TaskAttempt Log Url + */ + String getTaskAttemptLogUrl(JobConf conf, + String taskTrackerHttpAddress, + String taskAttemptId) + throws MalformedURLException; + + /** * Return true if the job has not switched to RUNNING state yet * and is still in PREP state */