Index: cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (revision 729179) +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; +import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.Driver; import org.apache.commons.logging.Log; @@ -255,6 +256,7 @@ if(! oproc.process_stage2(ss)) { System.exit(2); } + // set all properties specified via command line HiveConf conf = ss.getConf(); @@ -262,6 +264,8 @@ conf.set((String) item.getKey(), (String) item.getValue()); } + HiveHistory hist = HiveHistory.get(); + hist.startSession(); CliDriver cli = new CliDriver (); if(ss.execString != null) { @@ -294,6 +298,7 @@ PrintWriter out = new PrintWriter(System.out); final String HISTORYFILE = ".hivehistory"; String historyFile = System.getProperty("user.home") + File.separator + HISTORYFILE; + reader.setHistory(new History(new File(historyFile))); int ret = 0; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 729179) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -40,6 +40,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.mapredWork; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.exec.MapOperator.Counter; +import org.apache.hadoop.hive.ql.history.HiveHistory; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.io.*; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.session.SessionState; @@ -168,6 +171,7 @@ console.printInfo("Job running in-process (local Hadoop)"); } else { String hp = job.get("mapred.job.tracker"); + HiveHistory.get().setTaskProperty(getId(), Keys.TASK_HADOOP_ID, rj.getJobID()); console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = " + rj.getTrackingURL()); console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN) + @@ -192,7 +196,11 @@ report = " map = " + Math.round(rj.mapProgress() * 100) + "%, reduce =" + Math.round(rj.reduceProgress() * 100) + "%"; + HiveHistory.get().setTaskCounters(getId(), rj); if (!report.equals(lastReport)) { + HiveHistory.get().setTaskProperty(getId(), Keys.TASK_HADOOP_PROGRESS, report); + HiveHistory.get().progressTask(this); + console.printInfo(report); lastReport = report; } @@ -221,8 +229,10 @@ int newRed = (int)(inpSz / LOAD_PER_REDUCER) + 1; if (newRed < work.getNumReduceTasks().intValue()) { + HiveHistory.get().setTaskProperty(getId(), Keys.TASK_NUM_REDUCE_TASKS, String.valueOf(newRed)); LOG.warn("Number of reduce tasks inferred based on input size to : " + newRed); work.setNumReduceTasks(Integer.valueOf(newRed)); + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 0) @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.history; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.collections.map.HashedMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.Counters.Group; + +public class HiveHistory { + + PrintWriter histStream; // History File stream + static HiveHistory hiveHist = null; // Singleton instance + static final private Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory"); + + private LogHelper console; + + private SessionInfo sessionInfo = new SessionInfo(); + + // Job Hash Map + private HashMap jobInfoMap = new HashMap(); + + // Task Hash Map + private HashMap taskInfoMap = new HashMap(); + + private static final String DELIMITER = " "; + + public static enum RecordTypes { + JobStart, JobEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd + }; + + public static enum Keys { + SESSION_ID, JOB_ID, TASK_ID, JOB_STATUS, TASK_COUNTERS, JOB_RET_CODE, JOB_ERROR_MSG, JOB_NUM_TASKS, HAS_REDUCE_TASKS, TASK_STATUS, TASK_RET_CODE, TASK_NAME, TASK_NUM_REDUCE_TASKS, TASK_HADOOP_ID, TASK_HADOOP_PROGRESS + }; + + static class Info { + + } + + static class SessionInfo extends Info { + public String sessionId; + }; + + static class JobInfo extends Info { + public HashMap hm = new HashMap(); + }; + + static class TaskInfo extends Info { + public HashMap hm = new HashMap(); + + }; + + public HiveHistory() { + console = new LogHelper(LOG); + } + + public static HiveHistory get() { + if (hiveHist != null) + return hiveHist; + hiveHist = new HiveHistory(); + + return hiveHist; + } + + /** + * Write the a history record to history file + * + * @param rt + * @param keyValMap + */ + void log(RecordTypes rt, Map keyValMap) { + StringBuffer sb = new StringBuffer(); + sb.append(rt.name()); + + for (Map.Entry ent : keyValMap.entrySet()) { + + sb.append(DELIMITER); + String key = ent.getKey(); + String val = ent.getValue(); + sb.append(key + "=\"" + val + "\""); + + } + histStream.println(sb); + histStream.flush(); + + } + + /** + * CliDriver Calls this function at the start of a session + * + * @throws IOException + */ + public void startSession() throws IOException { + SessionState ss = SessionState.get(); + + if (ss != null) { + sessionInfo.sessionId = ss.getSessionId(); + } + + String file = "/tmp/hive_history_" + sessionInfo.sessionId + ".txt"; + console.printInfo("Hive history file=" + file); + histStream = new PrintWriter(file); + + HashedMap hm = new HashedMap(); + hm.put(Keys.SESSION_ID.name(), sessionInfo.sessionId); + log(RecordTypes.SessionStart, hm); + } + + /** + * CliDriver Calls this function at the end of a session + * + */ + public void endSession() { + + HashedMap hm = new HashedMap(); + hm.put(Keys.SESSION_ID.name(), sessionInfo.sessionId); + log(RecordTypes.SessionEnd, hm); + histStream.close(); + } + + /** + * Called at the start of job Driver.run() + */ + public void startJob() { + SessionState ss = SessionState.get(); + JobInfo ji = new JobInfo(); + + ji.hm.put(Keys.JOB_ID.name(), ss.getCmd()); + jobInfoMap.put(ss.getCmd(), ji); + + log(RecordTypes.JobStart, ji.hm); + + } + + /** + * Used to set job status and other attributes of a job + * + * @param jobId + * @param propName + * @param propValue + */ + public void setJobProperty(String jobId, Keys propName, String propValue) { + JobInfo ji = jobInfoMap.get(jobId); + ji.hm.put(propName.name(), propValue); + } + + /** + * Used to set task properties. + * + * @param taskId + * @param propName + * @param propValue + */ + public void setTaskProperty(String taskId, Keys propName, String propValue) { + TaskInfo ti = taskInfoMap.get(taskId); + ti.hm.put(propName.name(), propValue); + } + + /** + * Serialize the task counters and set as a task property. + * + * @param taskId + * @param rj + */ + public void setTaskCounters(String taskId, RunningJob rj) { + StringBuilder sb = new StringBuilder(""); + try { + + boolean first = true; + for (Group group : rj.getCounters()) { + for (Counter counter : group) { + if (first) { + first = false; + } else { + sb.append(','); + } + sb.append(group.getDisplayName()); + sb.append('.'); + sb.append(counter.getDisplayName()); + sb.append(':'); + sb.append(counter.getCounter()); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + taskInfoMap.get(taskId).hm.put(Keys.TASK_COUNTERS.name(), sb.toString()); + } + + /** + * Called at the end of Job. A Job is sql query. + * + * @param jobid + */ + public void endJob(String jobid) { + + JobInfo ji = jobInfoMap.get(jobid); + log(RecordTypes.JobEnd, ji.hm); + } + + /** + * Called at the start of a task. Called by Driver.run() A Job can have + * multiple tasks. Tasks will have multiple operator. + * + * @param task + */ + public void startTask(Task task) { + SessionState ss = SessionState.get(); + TaskInfo ti = new TaskInfo(); + + ti.hm.put(Keys.JOB_ID.name(), ss.getCmd()); + ti.hm.put(Keys.TASK_ID.name(), task.getId()); + + taskInfoMap.put(task.getId(), ti); + + log(RecordTypes.TaskStart, ti.hm); + + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void endTask(Task task) { + TaskInfo ti = taskInfoMap.get(task.getId()); + log(RecordTypes.TaskEnd, ti.hm); + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void progressTask(Task task) { + TaskInfo ti = taskInfoMap.get(task.getId()); + log(RecordTypes.TaskProgress, ti.hm); + + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 729179) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.history.HiveHistory; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.serde.ByteStream; import org.apache.hadoop.hive.conf.HiveConf; @@ -150,6 +152,7 @@ conf.setVar(HiveConf.ConfVars.HIVEQUERYID, command); + HiveHistory.get().startJob(); try { TaskFactory.resetId(); @@ -176,9 +179,13 @@ if (jobs > 0) { console.printInfo("Total MapReduce jobs = " + jobs); } + HiveHistory.get().setJobProperty(command, Keys.JOB_NUM_TASKS, String.valueOf(jobs)); + boolean hasReduce = hasReduceTasks(sem.getRootTasks()); + HiveHistory.get().setJobProperty(command, Keys.HAS_REDUCE_TASKS, String.valueOf(hasReduce)); if (hasReduce) { + HiveHistory.get().setJobProperty(command, Keys.TASK_NUM_REDUCE_TASKS, String.valueOf(hasReduce)); console.printInfo("Number of reducers = " + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS)); console.printInfo("In order to change numer of reducers use:"); console.printInfo(" set mapred.reduce.tasks = "); @@ -212,12 +219,16 @@ while(runnable.peek() != null) { Task tsk = runnable.remove(); + HiveHistory.get().startTask(tsk); + int exitVal = tsk.execute(); - if (exitVal != 0) { + HiveHistory.get().setTaskProperty(tsk.getId(), Keys.TASK_RET_CODE, String.valueOf(exitVal)); + HiveHistory.get().setTaskProperty(tsk.getId(), Keys.TASK_NAME, tsk.getClass().getName()); + HiveHistory.get().endTask(tsk); + if (exitVal != 0) { console.printError("FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName()); return 9; } - tsk.setDone(); if (tsk.getChildTasks() == null) { @@ -235,23 +246,30 @@ } } } + HiveHistory.get().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(0)); } catch (SemanticException e) { + HiveHistory.get().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(10)); console.printError("FAILED: Error in semantic analysis: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (10); } catch (ParseException e) { + + HiveHistory.get().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(11)); console.printError("FAILED: Parse Error: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (11); } catch (Exception e) { + HiveHistory.get().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(12)); // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils console.printError("FAILED: Unknown exception : " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (12); } finally { + HiveHistory.get().endJob(command); if(noName) { conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, ""); } } - + + console.printInfo("OK"); return (0); }