Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 732810) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -100,8 +100,12 @@ // Default file format for CREATE TABLE statement // Options: TextFile, SequenceFile - HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"); + HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"), + //Location of Hive run time structured log file + HIVEHISTORYFILELOC("hive.joblog.location", "/tmp/"+System.getProperty("user.name")); + + public final String varname; public final String defaultVal; public final int defaultIntVal; Index: ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (revision 0) @@ -0,0 +1,166 @@ +package org.apache.hadoop.hive.ql.history; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.history.HiveHistory.JobInfo; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.tools.LineageInfo; +import org.apache.hadoop.hive.service.HiveInterface; +import org.apache.hadoop.mapred.TextInputFormat; + +import junit.framework.TestCase; + +public class TestHiveHistory extends TestCase { + + + static HiveConf conf; + + static private String tmpdir = "/tmp/"+System.getProperty("user.name")+"/"; + static private Path tmppath = new Path(tmpdir); + static private Hive db; + static private FileSystem fs; + + /* + * intialize the tables + */ + static { + try { + conf = new HiveConf(HiveHistory.class); + + fs = FileSystem.get(conf); + if(fs.exists(tmppath) && !fs.getFileStatus(tmppath).isDir()) { + throw new RuntimeException (tmpdir + " exists but is not a directory"); + } + + if(!fs.exists(tmppath)) { + if(!fs.mkdirs(tmppath)) { + throw new RuntimeException ("Could not make scratch directory " + tmpdir); + } + } + + + + // copy the test files into hadoop if required. + int i = 0; + Path [] hadoopDataFile = new Path [2]; + String [] testFiles = {"kv1.txt", "kv2.txt"}; + String testFileDir = "file://" + conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + for(String oneFile: testFiles) { + Path localDataFile = new Path(testFileDir, oneFile); + hadoopDataFile[i] = new Path(tmppath, oneFile); + fs.copyFromLocalFile(false, true, localDataFile, hadoopDataFile[i]); + i++; + } + + // load the test files into tables + i = 0; + db = Hive.get(conf); + String [] srctables = {"src", "src2"}; + LinkedList cols = new LinkedList(); + cols.add("key"); + cols.add("value"); + for(String src: srctables) { + db.dropTable(src, true, true); + db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); + db.loadTable(hadoopDataFile[i], src, false); + i++; + } + + } catch (Throwable e) { + e.printStackTrace(); + throw new RuntimeException ("Encountered throwable"); + } + } + + + /** + * check history file output for this query. + */ + public void testSimpleQuery() { + LineageInfo lep = new LineageInfo(); + try { + + // NOTE: It is critical to do this here so that log4j is reinitialized before + // any of the other core hive classes are loaded + SessionState.initHiveLog4j(); + + CliSessionState ss = new CliSessionState (new HiveConf(SessionState.class)); + ss.in = System.in; + try { + ss.out = new PrintStream(System.out, true, "UTF-8"); + ss.err = new PrintStream(System.err, true, "UTF-8"); + } catch (UnsupportedEncodingException e) { + System.exit(3); + } + + SessionState.start(ss); + + + String cmd = "select a.key from src a"; + Driver d = new Driver(); + int ret = d.run(cmd); + if (ret != 0) + { + fail("Failed"); + } + HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get().getHiveHistory().getHistFileName()); + Map jobInfoMap = hv.getJobInfoMap(); + Map taskInfoMap = hv.getTaskInfoMap(); + if (jobInfoMap.size() != 1){ + fail("jobInfo Map size not 1"); + } + + if (taskInfoMap.size() != 1){ + fail("jobInfo Map size not 1"); + } + + JobInfo ji = jobInfoMap.get(cmd); + + + if (!ji.hm.get(Keys.JOB_NUM_TASKS.name()).equals("1")) + { + fail("Wrong number of tasks"); + } + + + + } catch (Exception e) { + e.printStackTrace(); + fail("Failed"); + } + } + + +} Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 732810) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -354,10 +354,6 @@ } public void cliInit(String tname, boolean recreate) throws Exception { - if(recreate) { - cleanUp(); - createSources(); - } CliSessionState ss = new CliSessionState(conf); @@ -373,6 +369,11 @@ ss.setIsSilent(true); SessionState.start(ss); cliDriver = new CliDriver(); + + if(recreate) { + cleanUp(); + createSources(); + } } public int executeOne(String tname) { Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 732810) +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.commons.lang.StringUtils; @@ -58,6 +59,10 @@ */ protected Hive db; + /* + * HiveHistory Object + */ + protected HiveHistory hiveHist; /** * Streams to read/write from */ @@ -168,7 +173,20 @@ return tss.get(); } - + + /** + * get hiveHitsory object which does structured logging + * @return HiveHistory + */ + public HiveHistory getHiveHistory(){ + if (hiveHist == null) { + hiveHist = new HiveHistory(this); + } + + return hiveHist; + } + + private static String makeSessionId() { GregorianCalendar gc = new GregorianCalendar(); String userid = System.getProperty("user.name"); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 732810) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.mapredWork; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.io.*; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.session.SessionState; @@ -168,6 +169,10 @@ console.printInfo("Job running in-process (local Hadoop)"); } else { String hp = job.get("mapred.job.tracker"); + if (SessionState.get() != null){ + SessionState.get().getHiveHistory().setTaskProperty(conf.getVar(HiveConf.ConfVars.HIVEQUERYID),getId(), + Keys.TASK_HADOOP_ID, rj.getJobID()); + } console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = " + rj.getTrackingURL()); console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN) + @@ -192,7 +197,16 @@ report = " map = " + Math.round(rj.mapProgress() * 100) + "%, reduce =" + Math.round(rj.reduceProgress() * 100) + "%"; + if (!report.equals(lastReport)) { + + SessionState ss = SessionState.get(); + if (ss != null){ + ss.getHiveHistory().setTaskCounters(conf.getVar(HiveConf.ConfVars.HIVEQUERYID),getId(), rj); + ss.getHiveHistory().setTaskProperty(conf.getVar(HiveConf.ConfVars.HIVEQUERYID),getId(), Keys.TASK_HADOOP_PROGRESS, report); + ss.getHiveHistory().progressTask(conf.getVar(HiveConf.ConfVars.HIVEQUERYID), + this); + } console.printInfo(report); lastReport = report; } @@ -202,6 +216,12 @@ private void inferNumReducers() throws Exception { FileSystem fs = FileSystem.get(job); + if (work.getReducer() != null){ + if (SessionState.get() != null){ + SessionState.get().getHiveHistory().setTaskProperty(conf.getVar(HiveConf.ConfVars.HIVEQUERYID),getId(), + Keys.TASK_NUM_REDUCERS, String.valueOf(work.getNumReduceTasks().intValue())); + } + } if ((work.getReducer() != null) && (work.getInferNumReducers() == true)) { long inpSz = 0; @@ -221,8 +241,12 @@ int newRed = (int)(inpSz / LOAD_PER_REDUCER) + 1; if (newRed < work.getNumReduceTasks().intValue()) { + if (SessionState.get() != null){ + SessionState.get().getHiveHistory().setTaskProperty(conf.getVar(HiveConf.ConfVars.HIVEQUERYID),getId(), Keys.TASK_NUM_REDUCERS, String.valueOf(newRed)); + } LOG.warn("Number of reduce tasks inferred based on input size to : " + newRed); work.setNumReduceTasks(Integer.valueOf(newRed)); + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java (revision 0) @@ -0,0 +1,135 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.history; + + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.ql.history.HiveHistory.JobInfo; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes; +import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; + +public class HiveHistoryViewer implements org.apache.hadoop.hive.ql.history.HiveHistory.Listener { + + String historyFile; + + String sessionId; + + + + + + // Job Hash Map + private HashMap jobInfoMap = new HashMap(); + + // Task Hash Map + private HashMap taskInfoMap = new HashMap(); + + + + HiveHistoryViewer(String path) + { + historyFile = path; + init(); + } + + + public String getSessionId() { + return sessionId; + } + + public Map getJobInfoMap() { + return jobInfoMap; + } + + public Map getTaskInfoMap() { + return taskInfoMap; + } + + + void init() + { + + try { + HiveHistory.parseHiveHistory(historyFile, this); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + /** + * Implementation Listner interface function + * @see org.apache.hadoop.hive.ql.history.HiveHistory.Listener#handle(org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes, java.util.Map) + */ + public void handle(RecordTypes recType, Map values) + { + + if (recType == RecordTypes.SessionStart){ + sessionId = values.get(Keys.SESSION_ID.name()); + } + else if (recType == RecordTypes.JobStart || recType == RecordTypes.JobEnd ){ + String key = values.get(Keys.JOB_ID.name()); + JobInfo ji; + if (jobInfoMap.containsKey(key)){ + ji = jobInfoMap.get(key); + + ji.hm.putAll(values); + + } + else + { + ji = new JobInfo(); + ji.hm = new HashMap(); + ji.hm.putAll(values); + + jobInfoMap.put(key, ji); + + } + } + else if ( recType == RecordTypes.TaskStart || recType == RecordTypes.TaskEnd || recType == RecordTypes.TaskProgress) + { + + String jobid = values.get(Keys.JOB_ID.name()); + String taskid = values.get(Keys.TASK_ID.name()); + String key = jobid + ":"+ taskid; + TaskInfo ti; + if (taskInfoMap.containsKey(key)){ + ti = taskInfoMap.get(key); + ti.hm.putAll(values); + } + else + { + ti = new TaskInfo(); + ti.hm = new HashMap(); + ti.hm.putAll(values); + taskInfoMap.put(key, ti); + + } + + } + + } + + + +} Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 0) @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.history; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.collections.map.HashedMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.Counters.Group; +import org.apache.hadoop.mapred.JobHistory.Keys; +import org.apache.hadoop.mapred.JobHistory.RecordTypes; + +public class HiveHistory { + + PrintWriter histStream; // History File stream + + String histFileName; //History file name + + + static final private Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory"); + + private LogHelper console; + + // Job Hash Map + private HashMap jobInfoMap = new HashMap(); + + // Task Hash Map + private HashMap taskInfoMap = new HashMap(); + + private static final String DELIMITER = " "; + + public static enum RecordTypes { + JobStart, JobEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd + }; + + public static enum Keys { + SESSION_ID, JOB_ID, TASK_ID, + JOB_STATUS, JOB_RET_CODE, JOB_NUM_TASKS, JOB_HAS_REDUCE_TASKS, JOB_NUM_REDUCE_TASKS, + TASK_STATUS, TASK_RET_CODE, TASK_NAME, TASK_HADOOP_ID, TASK_HADOOP_PROGRESS, TASK_COUNTERS, TASK_NUM_REDUCERS + }; + + + + + private static final String KEY = "(\\w+)"; + private static final String VALUE = "[[^\"]?]+"; // anything but a " in "" + + private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\""); + + //temp buffer for parsed dataa + private static Map parseBuffer = new HashMap(); + + /** + * Listner interface + * Parser will call handle function for each record type + */ + public static interface Listener{ + + public void handle(RecordTypes recType, Map values) throws IOException; + } + + + /** + * Parses history file and calls call back functions + * @param path + * @param l + * @throws IOException + */ + public static void parseHiveHistory(String path, Listener l ) + throws IOException{ + FileInputStream fi = new FileInputStream(path); + BufferedReader reader = new BufferedReader(new InputStreamReader (fi)); + try { + String line = null; + StringBuffer buf = new StringBuffer(); + while ((line = reader.readLine())!= null){ + buf.append(line); + if (!line.trim().endsWith("\"")){ + continue; + } + parseLine(buf.toString(), l); + buf = new StringBuffer(); + } + } finally { + try { reader.close(); } catch (IOException ex) {} + } + } + + + /** + * Parse a single line of history. + * @param line + * @param l + * @throws IOException + */ + private static void parseLine(String line, Listener l)throws IOException{ + // extract the record type + int idx = line.indexOf(' '); + String recType = line.substring(0, idx); + String data = line.substring(idx+1, line.length()); + + Matcher matcher = pattern.matcher(data); + + while(matcher.find()){ + String tuple = matcher.group(0); + String []parts = tuple.split("="); + + parseBuffer.put(parts[0], parts[1].substring(1, parts[1].length() -1)); + } + + l.handle(RecordTypes.valueOf(recType), parseBuffer); + + parseBuffer.clear(); + } + + static class Info { + + } + + static class SessionInfo extends Info { + public String sessionId; + }; + + static class JobInfo extends Info { + public Map hm = new HashMap(); + }; + + static class TaskInfo extends Info { + public Map hm = new HashMap(); + + }; + + /** + * Construct HiveHistory object an open history log file. + * @param ss + */ + public HiveHistory(SessionState ss) { + + try { + console = new LogHelper(LOG); + String conf_file_loc = ss.getConf().getVar(HiveConf.ConfVars.HIVEHISTORYFILELOC); + histFileName= conf_file_loc + "/hive_job_log_" + ss.getSessionId() + ".txt"; + console.printInfo("Hive history file=" + histFileName); + histStream = new PrintWriter(histFileName); + + HashedMap hm = new HashedMap(); + hm.put(Keys.SESSION_ID.name(), ss.getSessionId()); + log(RecordTypes.SessionStart, hm); + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + /** + * @return historyFileName + */ + public String getHistFileName() { + return histFileName; + } + + + + /** + * Write the a history record to history file + * + * @param rt + * @param keyValMap + */ + void log(RecordTypes rt, Map keyValMap) { + + if (histStream == null) return; + + StringBuffer sb = new StringBuffer(); + sb.append(rt.name()); + + for (Map.Entry ent : keyValMap.entrySet()) { + + sb.append(DELIMITER); + String key = ent.getKey(); + String val = ent.getValue(); + sb.append(key + "=\"" + val + "\""); + + } + histStream.println(sb); + histStream.flush(); + + } + + + /** + * Called at the start of job Driver.run() + */ + public void startJob() { + SessionState ss = SessionState.get(); + if (ss == null) return; + JobInfo ji = new JobInfo(); + + ji.hm.put(Keys.JOB_ID.name(), ss.getCmd()); + jobInfoMap.put(ss.getCmd(), ji); + + log(RecordTypes.JobStart, ji.hm); + + } + + /** + * Used to set job status and other attributes of a job + * + * @param jobId + * @param propName + * @param propValue + */ + public void setJobProperty(String jobId, Keys propName, String propValue) { + JobInfo ji = jobInfoMap.get(jobId); + if (ji == null) return; + ji.hm.put(propName.name(), propValue); + } + + /** + * Used to set task properties. + * + * @param taskId + * @param propName + * @param propValue + */ + public void setTaskProperty(String jobId, String taskId, Keys propName, String propValue) { + String id = jobId+":"+taskId; + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) return; + ti.hm.put(propName.name(), propValue); + } + + /** + * Serialize the task counters and set as a task property. + * + * @param taskId + * @param rj + */ + public void setTaskCounters(String jobId, String taskId, RunningJob rj) { + String id = jobId+":"+taskId; + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) return; + StringBuilder sb = new StringBuilder(""); + try { + + boolean first = true; + for (Group group : rj.getCounters()) { + for (Counter counter : group) { + if (first) { + first = false; + } else { + sb.append(','); + } + sb.append(group.getDisplayName()); + sb.append('.'); + sb.append(counter.getDisplayName()); + sb.append(':'); + sb.append(counter.getCounter()); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString()); + } + + /** + * Called at the end of Job. A Job is sql query. + * + * @param jobid + */ + public void endJob(String jobid) { + + JobInfo ji = jobInfoMap.get(jobid); + if (ji == null) return; + log(RecordTypes.JobEnd, ji.hm); + } + + /** + * Called at the start of a task. Called by Driver.run() A Job can have + * multiple tasks. Tasks will have multiple operator. + * + * @param task + */ + public void startTask(String jobId, Task task, String taskName) { + SessionState ss = SessionState.get(); + if (ss == null) return; + TaskInfo ti = new TaskInfo(); + + ti.hm.put(Keys.JOB_ID.name(), ss.getCmd()); + ti.hm.put(Keys.TASK_ID.name(), task.getId()); + ti.hm.put(Keys.TASK_NAME.name(), taskName); + + String id = jobId+":"+task.getId(); + taskInfoMap.put(id, ti); + + log(RecordTypes.TaskStart, ti.hm); + + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void endTask(String jobId, Task task) { + String id = jobId+":"+task.getId(); + TaskInfo ti = taskInfoMap.get(id); + + if (ti == null) return; + log(RecordTypes.TaskEnd, ti.hm); + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void progressTask(String jobId, Task task) { + String id = jobId+":"+task.getId(); + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) return; + log(RecordTypes.TaskProgress, ti.hm); + + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 732810) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.history.HiveHistory; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.serde.ByteStream; import org.apache.hadoop.hive.conf.HiveConf; @@ -122,6 +124,19 @@ } + public int countReduceTasks(List> tasks) { + if (tasks == null) + return 0; + int reduceTasks = 0; + for (Task task: tasks) { + if (task.hasReduce()) { + reduceTasks++; + } + reduceTasks += countReduceTasks(task.getChildTasks()); + } + return reduceTasks; + } + /** * for backwards compatibility with current tests */ @@ -154,6 +169,9 @@ ctx.clear(); ctx.makeScratchDir(); + + SessionState.get().getHiveHistory().startJob(); + resStream = null; pd = new ParseDriver(); @@ -173,9 +191,14 @@ if (jobs > 0) { console.printInfo("Total MapReduce jobs = " + jobs); } + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_NUM_TASKS, String.valueOf(jobs)); + boolean hasReduce = hasReduceTasks(sem.getRootTasks()); + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_HAS_REDUCE_TASKS, String.valueOf(hasReduce)); if (hasReduce) { + + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_NUM_REDUCE_TASKS, String.valueOf(countReduceTasks(sem.getRootTasks()))); console.printInfo("Number of reducers = " + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS)); console.printInfo("In order to change numer of reducers use:"); console.printInfo(" set mapred.reduce.tasks = "); @@ -209,12 +232,15 @@ while(runnable.peek() != null) { Task tsk = runnable.remove(); + SessionState.get().getHiveHistory().startTask(command, tsk, tsk.getClass().getName()); + int exitVal = tsk.execute(); - if (exitVal != 0) { + SessionState.get().getHiveHistory().setTaskProperty(command, tsk.getId(), Keys.TASK_RET_CODE, String.valueOf(exitVal)); + SessionState.get().getHiveHistory().endTask(command, tsk); + if (exitVal != 0) { console.printError("FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName()); return 9; } - tsk.setDone(); if (tsk.getChildTasks() == null) { @@ -232,23 +258,32 @@ } } } + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(0)); } catch (SemanticException e) { + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(10)); console.printError("FAILED: Error in semantic analysis: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (10); } catch (ParseException e) { + + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(11)); console.printError("FAILED: Parse Error: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (11); } catch (Exception e) { + SessionState.get().getHiveHistory().setJobProperty(command, Keys.JOB_RET_CODE, String.valueOf(12)); // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils console.printError("FAILED: Unknown exception : " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (12); } finally { + SessionState ss = SessionState.get(); + HiveHistory h =ss.getHiveHistory(); + h.endJob(command); if(noName) { conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, ""); } } - + + console.printInfo("OK"); return (0); }