Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 731641) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -91,8 +91,12 @@ // Default file format for CREATE TABLE statement // Options: TextFile, SequenceFile - HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"); + HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"), + //Location of Hive run time structured log file + HIVEHISTORTFILELOC("hive.joblog.location", "/tmp/"+System.getProperty("user.name")); + + public final String varname; public final String defaultVal; public final int defaultIntVal; Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 731641) +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.commons.lang.StringUtils; @@ -58,6 +59,10 @@ */ protected Hive db; + /* + * HiveHistory Object + */ + protected HiveHistory hiveHist; /** * Streams to read/write from */ @@ -177,7 +182,27 @@ return tss.get(); } - + + /** + * get hiveHitsory object which does structured logging + * @return HiveHistory + */ + public HiveHistory getHiveHistory(){ + if (hiveHist == null) { + hiveHist = HiveHistory.get(); + } + + return hiveHist; + } + + /** + * set hiveHitsory object which does structured logging + * @param hist + */ + public void setHiveHistory(HiveHistory hist){ + hiveHist = hist; + } + private static String makeSessionId() { GregorianCalendar gc = new GregorianCalendar(); String userid = System.getProperty("user.name"); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 731641) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.mapredWork; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.io.*; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.session.SessionState; @@ -168,6 +169,8 @@ console.printInfo("Job running in-process (local Hadoop)"); } else { String hp = job.get("mapred.job.tracker"); + SessionState.get().getHiveHistory().setTaskProperty(conf.getVar(HiveConf.ConfVars.HIVEQUERYID),getId(), + Keys.TASK_HADOOP_ID, rj.getJobID()); console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = " + rj.getTrackingURL()); console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN) + @@ -192,7 +195,12 @@ report = " map = " + Math.round(rj.mapProgress() * 100) + "%, reduce =" + Math.round(rj.reduceProgress() * 100) + "%"; + SessionState.get().getHiveHistory().setTaskCounters(conf.getVar(HiveConf.ConfVars.HIVEQUERYID),getId(), rj); if (!report.equals(lastReport)) { + SessionState.get().getHiveHistory().setTaskProperty(conf.getVar(HiveConf.ConfVars.HIVEQUERYID),getId(), Keys.TASK_HADOOP_PROGRESS, report); + SessionState.get().getHiveHistory().progressTask(conf.getVar(HiveConf.ConfVars.HIVEQUERYID), + this); + console.printInfo(report); lastReport = report; } @@ -221,8 +229,10 @@ int newRed = (int)(inpSz / LOAD_PER_REDUCER) + 1; if (newRed < work.getNumReduceTasks().intValue()) { + SessionState.get().getHiveHistory().setTaskProperty(conf.getVar(HiveConf.ConfVars.HIVEQUERYID),getId(), Keys.TASK_NUM_REDUCE_TASKS, String.valueOf(newRed)); LOG.warn("Number of reduce tasks inferred based on input size to : " + newRed); work.setNumReduceTasks(Integer.valueOf(newRed)); + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 0) @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.history; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.collections.map.HashedMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.Counters.Group; + +public class HiveHistory { + + PrintWriter histStream; // History File stream + + static final private Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory"); + + private LogHelper console; + + // Job Hash Map + private HashMap jobInfoMap = new HashMap(); + + // Task Hash Map + private HashMap taskInfoMap = new HashMap(); + + private static final String DELIMITER = " "; + + public static enum RecordTypes { + JobStart, JobEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd + }; + + public static enum Keys { + SESSION_ID, JOB_ID, TASK_ID, JOB_STATUS, TASK_COUNTERS, JOB_RET_CODE, JOB_ERROR_MSG, JOB_NUM_TASKS, HAS_REDUCE_TASKS, TASK_STATUS, TASK_RET_CODE, TASK_NAME, TASK_NUM_REDUCE_TASKS, TASK_HADOOP_ID, TASK_HADOOP_PROGRESS + }; + + static class Info { + + } + + static class SessionInfo extends Info { + public String sessionId; + }; + + static class JobInfo extends Info { + public HashMap hm = new HashMap(); + }; + + static class TaskInfo extends Info { + public HashMap hm = new HashMap(); + + }; + + public HiveHistory() { + console = new LogHelper(LOG); + } + + public static HiveHistory get() { + + HiveHistory hiveHist = null; + hiveHist = new HiveHistory(); + + try{ + hiveHist.init(); + SessionState.get().setHiveHistory(hiveHist); + } + catch (Exception e) { + e.printStackTrace(); + } + return hiveHist; + } + + /** + * Write the a history record to history file + * + * @param rt + * @param keyValMap + */ + void log(RecordTypes rt, Map keyValMap) { + + if (histStream == null) return; + + StringBuffer sb = new StringBuffer(); + sb.append(rt.name()); + + for (Map.Entry ent : keyValMap.entrySet()) { + + sb.append(DELIMITER); + String key = ent.getKey(); + String val = ent.getValue(); + sb.append(key + "=\"" + val + "\""); + + } + histStream.println(sb); + histStream.flush(); + + } + + public void init() throws IOException{ + + SessionState ss = SessionState.get(); + if (ss == null) return; + + String conf_file_loc = ss.getConf().getVar(HiveConf.ConfVars.HIVEHISTORTFILELOC); + String file = conf_file_loc + "/hive_job_log_" + ss.getSessionId() + ".txt"; + console.printInfo("Hive history file=" + file); + histStream = new PrintWriter(file); + + HashedMap hm = new HashedMap(); + hm.put(Keys.SESSION_ID.name(), ss.getSessionId()); + log(RecordTypes.SessionStart, hm); + + ss.setHiveHistory(this); + + } + + /** + * Called at the start of job Driver.run() + */ + public void startJob() { + SessionState ss = SessionState.get(); + if (ss == null) return; + JobInfo ji = new JobInfo(); + + ji.hm.put(Keys.JOB_ID.name(), ss.getCmd()); + jobInfoMap.put(ss.getCmd(), ji); + + log(RecordTypes.JobStart, ji.hm); + + } + + /** + * Used to set job status and other attributes of a job + * + * @param jobId + * @param propName + * @param propValue + */ + public void setJobProperty(String jobId, Keys propName, String propValue) { + JobInfo ji = jobInfoMap.get(jobId); + if (ji == null) return; + ji.hm.put(propName.name(), propValue); + } + + /** + * Used to set task properties. + * + * @param taskId + * @param propName + * @param propValue + */ + public void setTaskProperty(String jobId, String taskId, Keys propName, String propValue) { + String id = jobId+":"+taskId; + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) return; + ti.hm.put(propName.name(), propValue); + } + + /** + * Serialize the task counters and set as a task property. + * + * @param taskId + * @param rj + */ + public void setTaskCounters(String jobId, String taskId, RunningJob rj) { + String id = jobId+":"+taskId; + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) return; + StringBuilder sb = new StringBuilder(""); + try { + + boolean first = true; + for (Group group : rj.getCounters()) { + for (Counter counter : group) { + if (first) { + first = false; + } else { + sb.append(','); + } + sb.append(group.getDisplayName()); + sb.append('.'); + sb.append(counter.getDisplayName()); + sb.append(':'); + sb.append(counter.getCounter()); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString()); + } + + /** + * Called at the end of Job. A Job is sql query. + * + * @param jobid + */ + public void endJob(String jobid) { + + JobInfo ji = jobInfoMap.get(jobid); + if (ji == null) return; + log(RecordTypes.JobEnd, ji.hm); + } + + /** + * Called at the start of a task. Called by Driver.run() A Job can have + * multiple tasks. Tasks will have multiple operator. + * + * @param task + */ + public void startTask(String jobId, Task task) { + SessionState ss = SessionState.get(); + if (ss == null) return; + TaskInfo ti = new TaskInfo(); + + ti.hm.put(Keys.JOB_ID.name(), ss.getCmd()); + ti.hm.put(Keys.TASK_ID.name(), task.getId()); + + String id = jobId+":"+task.getId(); + taskInfoMap.put(id, ti); + + log(RecordTypes.TaskStart, ti.hm); + + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void endTask(String jobId, Task task) { + String id = jobId+":"+task.getId(); + TaskInfo ti = taskInfoMap.get(id); + + if (ti == null) return; + log(RecordTypes.TaskEnd, ti.hm); + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void progressTask(String jobId, Task task) { + String id = jobId+":"+task.getId(); + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) return; + log(RecordTypes.TaskProgress, ti.hm); + + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 731641) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.history.HiveHistory; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.serde.ByteStream; import org.apache.hadoop.hive.conf.HiveConf; @@ -150,6 +152,7 @@ conf.setVar(HiveConf.ConfVars.HIVEQUERYID, command); + SessionState.get().getHiveHistory().startJob(); try { TaskFactory.resetId(); @@ -157,6 +160,9 @@ ctx.clear(); ctx.makeScratchDir(); + + SessionState.get().getHiveHistory().startJob(); + resStream = null; pd = new ParseDriver(); @@ -176,9 +182,13 @@ if (jobs > 0) { console.printInfo("Total MapReduce jobs = " + jobs); } + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_NUM_TASKS, String.valueOf(jobs)); + boolean hasReduce = hasReduceTasks(sem.getRootTasks()); + SessionState.get().getHiveHistory().setJobProperty(command, Keys.HAS_REDUCE_TASKS, String.valueOf(hasReduce)); if (hasReduce) { + SessionState.get().getHiveHistory().setJobProperty(command, Keys.TASK_NUM_REDUCE_TASKS, String.valueOf(hasReduce)); console.printInfo("Number of reducers = " + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS)); console.printInfo("In order to change numer of reducers use:"); console.printInfo(" set mapred.reduce.tasks = "); @@ -212,12 +222,16 @@ while(runnable.peek() != null) { Task tsk = runnable.remove(); + SessionState.get().getHiveHistory().startTask(command, tsk); + int exitVal = tsk.execute(); - if (exitVal != 0) { + SessionState.get().getHiveHistory().setTaskProperty(command, tsk.getId(), Keys.TASK_RET_CODE, String.valueOf(exitVal)); + SessionState.get().getHiveHistory().setTaskProperty(command, tsk.getId(), Keys.TASK_NAME, tsk.getClass().getName()); + SessionState.get().getHiveHistory().endTask(command, tsk); + if (exitVal != 0) { console.printError("FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName()); return 9; } - tsk.setDone(); if (tsk.getChildTasks() == null) { @@ -235,23 +249,30 @@ } } } + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(0)); } catch (SemanticException e) { + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(10)); console.printError("FAILED: Error in semantic analysis: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (10); } catch (ParseException e) { + + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(11)); console.printError("FAILED: Parse Error: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (11); } catch (Exception e) { + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(12)); // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils console.printError("FAILED: Unknown exception : " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (12); } finally { + SessionState.get().getHiveHistory().endJob(command); if(noName) { conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, ""); } } - + + console.printInfo("OK"); return (0); }