Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 734538) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -78,8 +78,13 @@ // session identifier HIVESESSIONID("hive.session.id", ""), + // query being executed (multiple per session) - HIVEQUERYID("hive.query.string", ""), + HIVEQUERYSTRING("hive.query.string", ""), + + // id of query being executed (multiple per session) + HIVEQUERYID("hive.query.id", ""), + // id of the mapred plan being executed (multiple per query) HIVEPLANID("hive.query.planid", ""), // max jobname length @@ -100,8 +105,12 @@ // Default file format for CREATE TABLE statement // Options: TextFile, SequenceFile - HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"); + HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"), + //Location of Hive run time structured log file + HIVEHISTORYFILELOC("hive.querylog.location", "/tmp/"+System.getProperty("user.name")); + + public final String varname; public final String defaultVal; public final int defaultIntVal; Index: cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (revision 734538) +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (working copy) @@ -250,7 +250,6 @@ System.exit(3); } - SessionState.start(ss); if(! oproc.process_stage2(ss)) { System.exit(2); @@ -261,6 +260,8 @@ for(Map.Entry item: ss.cmdProperties.entrySet()) { conf.set((String) item.getKey(), (String) item.getValue()); } + + SessionState.start(ss); CliDriver cli = new CliDriver (); Index: ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (revision 0) @@ -0,0 +1,164 @@ +package org.apache.hadoop.hive.ql.history; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.Map; +import java.util.TreeSet; +import java.util.Map.Entry; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.tools.LineageInfo; +import org.apache.hadoop.hive.service.HiveInterface; +import org.apache.hadoop.mapred.TextInputFormat; + +import junit.framework.TestCase; + +public class TestHiveHistory extends TestCase { + + static HiveConf conf; + + static private String tmpdir = "/tmp/" + System.getProperty("user.name") + + "/"; + static private Path tmppath = new Path(tmpdir); + static private Hive db; + static private FileSystem fs; + + /* + * intialize the tables + */ + + protected void setUp(){ + try { + conf = new HiveConf(HiveHistory.class); + + fs = FileSystem.get(conf); + if (fs.exists(tmppath) && !fs.getFileStatus(tmppath).isDir()) { + throw new RuntimeException(tmpdir + " exists but is not a directory"); + } + + if (!fs.exists(tmppath)) { + if (!fs.mkdirs(tmppath)) { + throw new RuntimeException("Could not make scratch directory " + + tmpdir); + } + } + + // copy the test files into hadoop if required. + int i = 0; + Path[] hadoopDataFile = new Path[2]; + String[] testFiles = { "kv1.txt", "kv2.txt" }; + String testFileDir = "file://" + + conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + for (String oneFile : testFiles) { + Path localDataFile = new Path(testFileDir, oneFile); + hadoopDataFile[i] = new Path(tmppath, oneFile); + fs.copyFromLocalFile(false, true, localDataFile, hadoopDataFile[i]); + i++; + } + + // load the test files into tables + i = 0; + db = Hive.get(conf); + String[] srctables = { "src", "src2" }; + LinkedList cols = new LinkedList(); + cols.add("key"); + cols.add("value"); + for (String src : srctables) { + db.dropTable(src, true, true); + db.createTable(src, cols, null, TextInputFormat.class, + IgnoreKeyTextOutputFormat.class); + db.loadTable(hadoopDataFile[i], src, false); + i++; + } + + } catch (Throwable e) { + e.printStackTrace(); + throw new RuntimeException("Encountered throwable"); + } +} + + /** + * check history file output for this query.als + */ + public void testSimpleQuery() { + LineageInfo lep = new LineageInfo(); + try { + + // NOTE: It is critical to do this here so that log4j is reinitialized + // before + // any of the other core hive classes are loaded + SessionState.initHiveLog4j(); + + CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class)); + ss.in = System.in; + try { + ss.out = new PrintStream(System.out, true, "UTF-8"); + ss.err = new PrintStream(System.err, true, "UTF-8"); + } catch (UnsupportedEncodingException e) { + System.exit(3); + } + + SessionState.start(ss); + + String cmd = "select a.key from src a"; + Driver d = new Driver(); + int ret = d.run(cmd); + if (ret != 0) { + fail("Failed"); + } + HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get() + .getHiveHistory().getHistFileName()); + Map jobInfoMap = hv.getJobInfoMap(); + Map taskInfoMap = hv.getTaskInfoMap(); + if (jobInfoMap.size() != 1) { + fail("jobInfo Map size not 1"); + } + + if (taskInfoMap.size() != 1) { + fail("jobInfo Map size not 1"); + } + + + cmd = (String)jobInfoMap.keySet().toArray()[0]; + QueryInfo ji = jobInfoMap.get(cmd); + + if (!ji.hm.get(Keys.QUERY_NUM_TASKS.name()).equals("1")) { + fail("Wrong number of tasks"); + } + + } catch (Exception e) { + e.printStackTrace(); + fail("Failed"); + } + } + +} Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 734538) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -159,6 +159,8 @@ this.logDir = logDir; conf = new HiveConf(Driver.class); + CliSessionState ss = new CliSessionState(conf); + SessionState.start(ss); // System.out.println(conf.toString()); testFiles = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); @@ -371,8 +373,8 @@ createSources(); } - CliSessionState ss = new CliSessionState(conf); - + //CliSessionState ss = new CliSessionState(conf); + SessionState ss = SessionState.get(); ss.in = System.in; File qf = new File(outDir, tname); Index: ql/src/test/templates/TestCliDriver.vm =================================================================== --- ql/src/test/templates/TestCliDriver.vm (revision 734538) +++ ql/src/test/templates/TestCliDriver.vm (working copy) @@ -9,7 +9,14 @@ import org.apache.hadoop.hive.ql.QTestUtil; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.history.HiveHistoryViewer; +import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; +import org.apache.hadoop.hive.ql.session.SessionState; + + import org.antlr.runtime.*; import org.antlr.runtime.tree.*; @@ -62,7 +69,20 @@ if (ecode != 0) { fail("Client Execution failed with error code = " + ecode); } + if (SessionState.get() != null) { + HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get() + .getHiveHistory().getHistFileName()); + Map jobInfoMap = hv.getJobInfoMap(); + Map taskInfoMap = hv.getTaskInfoMap(); + String cmd = (String)jobInfoMap.keySet().toArray()[0]; + QueryInfo ji = jobInfoMap.get(cmd); + + if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) { + fail("Wrong return code in hive history"); + } + } + ecode = qt.checkCliDriverResults("$fname"); if (ecode != 0) { fail("Client execution results dailed with error code = " + ecode); Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 734538) +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.commons.lang.StringUtils; @@ -58,6 +59,10 @@ */ protected Hive db; + /* + * HiveHistory Object + */ + protected HiveHistory hiveHist; /** * Streams to read/write from */ @@ -121,10 +126,15 @@ } public void setCmd(String cmdString) { - conf.setVar(HiveConf.ConfVars.HIVEQUERYID, cmdString); + conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, cmdString); } public String getCmd() { + return (conf.getVar(HiveConf.ConfVars.HIVEQUERYSTRING)); + } + + + public String getQueryId() { return (conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); } @@ -144,6 +154,7 @@ public static SessionState start(HiveConf conf) { SessionState ss = new SessionState (conf); ss.getConf().setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId()); + ss.hiveHist = new HiveHistory(ss); tss.set(ss); return (ss); } @@ -154,10 +165,15 @@ * session object when switching from one session to another */ public static SessionState start(SessionState startSs) { + tss.set(startSs); if(StringUtils.isEmpty(startSs.getConf().getVar(HiveConf.ConfVars.HIVESESSIONID))) { startSs.getConf().setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId()); } + + if (startSs.hiveHist == null){ + startSs.hiveHist = new HiveHistory(startSs); + } return startSs; } @@ -168,7 +184,16 @@ return tss.get(); } - + + /** + * get hiveHitsory object which does structured logging + * @return + */ + public HiveHistory getHiveHistory(){ + return hiveHist; + } + + private static String makeSessionId() { GregorianCalendar gc = new GregorianCalendar(); String userid = System.getProperty("user.name"); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 734538) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.mapredWork; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.io.*; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.session.SessionState; @@ -61,15 +62,16 @@ public static String getRealFiles(Configuration conf) { // fill in local files to be added to the task environment SessionState ss = SessionState.get(); - Set files = (ss == null) ? null : ss.list_resource(SessionState.ResourceType.FILE, null); - if(files != null) { - ArrayList realFiles = new ArrayList (files.size()); - for(String one: files) { + Set files = (ss == null) ? null : ss.list_resource( + SessionState.ResourceType.FILE, null); + if (files != null) { + ArrayList realFiles = new ArrayList(files.size()); + for (String one : files) { try { realFiles.add(Utilities.realFile(one, conf)); } catch (IOException e) { - throw new RuntimeException ("Cannot validate file " + one + - "due to exception: " + e.getMessage(), e); + throw new RuntimeException("Cannot validate file " + one + + "due to exception: " + e.getMessage(), e); } } return StringUtils.join(realFiles, ","); @@ -78,11 +80,10 @@ } } - /** * Initialization when invoked from QL */ - public void initialize (HiveConf conf) { + public void initialize(HiveConf conf) { super.initialize(conf); job = new JobConf(conf, ExecDriver.class); String realFiles = getRealFiles(job); @@ -91,7 +92,7 @@ // workaround for hadoop-17 - jobclient only looks at commandlineconfig Configuration commandConf = JobClient.getCommandLineConfig(); - if(commandConf != null) { + if (commandConf != null) { commandConf.set("tmpfiles", realFiles); } } @@ -100,63 +101,70 @@ /** * Constructor/Initialization for invocation as independent utility */ - public ExecDriver(mapredWork plan, JobConf job, boolean isSilent) throws HiveException { + public ExecDriver(mapredWork plan, JobConf job, boolean isSilent) + throws HiveException { setWork(plan); this.job = job; LOG = LogFactory.getLog(this.getClass().getName()); - console = new LogHelper(LOG, isSilent); + console = new LogHelper(LOG, isSilent); } protected void fillInDefaults() { // this is a temporary hack to fix things that are not fixed in the compiler - if(work.getNumReduceTasks() == null) { - if(work.getReducer() == null) { - LOG.warn("Number of reduce tasks not specified. Defaulting to 0 since there's no reduce operator"); + if (work.getNumReduceTasks() == null) { + if (work.getReducer() == null) { + LOG + .warn("Number of reduce tasks not specified. Defaulting to 0 since there's no reduce operator"); work.setNumReduceTasks(Integer.valueOf(0)); } else { - LOG.warn("Number of reduce tasks not specified. Defaulting to jobconf value of: " + job.getNumReduceTasks()); + LOG + .warn("Number of reduce tasks not specified. Defaulting to jobconf value of: " + + job.getNumReduceTasks()); work.setNumReduceTasks(job.getNumReduceTasks()); } - } - else - LOG.info("Number of reduce tasks determined at compile : " + work.getNumReduceTasks()); + } else + LOG.info("Number of reduce tasks determined at compile : " + + work.getNumReduceTasks()); } /** - * A list of the currently running jobs spawned in this Hive instance that is used - * to kill all running jobs in the event of an unexpected shutdown - i.e., the JVM shuts - * down while there are still jobs running. + * A list of the currently running jobs spawned in this Hive instance that is + * used to kill all running jobs in the event of an unexpected shutdown - + * i.e., the JVM shuts down while there are still jobs running. */ - public static HashMap runningJobKillURIs = new HashMap (); + public static HashMap runningJobKillURIs = new HashMap(); - /** - * In Hive, when the user control-c's the command line, any running jobs spawned from that command - * line are best-effort killed. - * - * This static constructor registers a shutdown thread to iterate over all the running job - * kill URLs and do a get on them. - * + * In Hive, when the user control-c's the command line, any running jobs + * spawned from that command line are best-effort killed. + * + * This static constructor registers a shutdown thread to iterate over all the + * running job kill URLs and do a get on them. + * */ static { - if(new org.apache.hadoop.conf.Configuration().getBoolean("webinterface.private.actions", false)) { + if (new org.apache.hadoop.conf.Configuration().getBoolean( + "webinterface.private.actions", false)) { Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - for(Iterator elems = runningJobKillURIs.values().iterator(); elems.hasNext() ; ) { - String uri = elems.next(); - try { - System.err.println("killing job with: " + uri); - int retCode = ((java.net.HttpURLConnection)new java.net.URL(uri).openConnection()).getResponseCode(); - if(retCode != 200) { - System.err.println("Got an error trying to kill job with URI: " + uri + " = " + retCode); - } - } catch(Exception e) { - System.err.println("trying to kill job, caught: " + e); - // do nothing + public void run() { + for (Iterator elems = runningJobKillURIs.values().iterator(); elems + .hasNext();) { + String uri = elems.next(); + try { + System.err.println("killing job with: " + uri); + int retCode = ((java.net.HttpURLConnection) new java.net.URL(uri) + .openConnection()).getResponseCode(); + if (retCode != 200) { + System.err.println("Got an error trying to kill job with URI: " + + uri + " = " + retCode); } + } catch (Exception e) { + System.err.println("trying to kill job, caught: " + e); + // do nothing } } - }); + } + }); } } @@ -168,19 +176,23 @@ console.printInfo("Job running in-process (local Hadoop)"); } else { String hp = job.get("mapred.job.tracker"); - console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = " + rj.getTrackingURL()); - console.printInfo("Kill Command = " + - HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN) + - " job -Dmapred.job.tracker=" + hp + " -kill " - + rj.getJobID()); + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setTaskProperty( + SessionState.get().getQueryId(), getId(), + Keys.TASK_HADOOP_ID, rj.getJobID()); + } + console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = " + + rj.getTrackingURL()); + console.printInfo("Kill Command = " + + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN) + + " job -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID()); } } /** * from StreamJob.java */ - public RunningJob jobProgress(JobClient jc, RunningJob rj) - throws IOException { + public RunningJob jobProgress(JobClient jc, RunningJob rj) throws IOException { String lastReport = ""; while (!rj.isComplete()) { try { @@ -190,9 +202,20 @@ rj = jc.getJob(rj.getJobID()); String report = null; report = " map = " + Math.round(rj.mapProgress() * 100) + "%, reduce =" - + Math.round(rj.reduceProgress() * 100) + "%"; - + + Math.round(rj.reduceProgress() * 100) + "%"; + if (!report.equals(lastReport)) { + + SessionState ss = SessionState.get(); + if (ss != null) { + ss.getHiveHistory().setTaskCounters( + SessionState.get().getQueryId(), getId(), rj); + ss.getHiveHistory().setTaskProperty( + SessionState.get().getQueryId(), getId(), + Keys.TASK_HADOOP_PROGRESS, report); + ss.getHiveHistory().progressTask( + SessionState.get().getQueryId(), this); + } console.printInfo(report); lastReport = report; } @@ -202,48 +225,51 @@ private void inferNumReducers() throws Exception { FileSystem fs = FileSystem.get(job); - + if ((work.getReducer() != null) && (work.getInferNumReducers() == true)) { long inpSz = 0; - + // based on the input size - estimate the number of reducers Path[] inputPaths = FileInputFormat.getInputPaths(job); - + for (Path inputP : inputPaths) { if (fs.exists(inputP)) { FileStatus[] fStats = fs.listStatus(inputP); - for (FileStatus fStat:fStats) + for (FileStatus fStat : fStats) inpSz += fStat.getLen(); } } - - int newRed = (int)(inpSz / LOAD_PER_REDUCER) + 1; - if (newRed < work.getNumReduceTasks().intValue()) - { - LOG.warn("Number of reduce tasks inferred based on input size to : " + newRed); + int newRed = (int) (inpSz / LOAD_PER_REDUCER) + 1; + if (newRed < work.getNumReduceTasks().intValue()) { + + LOG.warn("Number of reduce tasks inferred based on input size to : " + + newRed); work.setNumReduceTasks(Integer.valueOf(newRed)); + } } } /** * Add new elements to the classpath - * @param newPaths Array of classpath elements + * + * @param newPaths + * Array of classpath elements */ - private static void addToClassPath(String [] newPaths) throws Exception { + private static void addToClassPath(String[] newPaths) throws Exception { Thread curThread = Thread.currentThread(); - URLClassLoader loader = (URLClassLoader)curThread.getContextClassLoader(); + URLClassLoader loader = (URLClassLoader) curThread.getContextClassLoader(); List curPath = Arrays.asList(loader.getURLs()); - ArrayList newPath = new ArrayList (); + ArrayList newPath = new ArrayList(); - for(String onestr: newPaths) { + for (String onestr : newPaths) { URL oneurl = (new File(onestr)).toURL(); - if(!curPath.contains(oneurl)) { + if (!curPath.contains(oneurl)) { newPath.add(oneurl); } } - + loader = new URLClassLoader(newPath.toArray(new URL[0]), loader); curThread.setContextClassLoader(loader); } @@ -256,35 +282,34 @@ fillInDefaults(); String invalidReason = work.isInvalid(); - if(invalidReason != null) { - throw new RuntimeException("Plan invalid, Reason: "+invalidReason); + if (invalidReason != null) { + throw new RuntimeException("Plan invalid, Reason: " + invalidReason); } Utilities.setMapRedWork(job, work); - - - for(String onefile: work.getPathToAliases().keySet()) { + + for (String onefile : work.getPathToAliases().keySet()) { LOG.info("Adding input file " + onefile); FileInputFormat.addInputPaths(job, onefile); } - + String hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR); String jobScratchDir = hiveScratchDir + Utilities.randGen.nextInt(); FileOutputFormat.setOutputPath(job, new Path(jobScratchDir)); job.setMapperClass(ExecMapper.class); - - job.setMapOutputKeyClass(HiveKey.class); + + job.setMapOutputKeyClass(HiveKey.class); job.setMapOutputValueClass(BytesWritable.class); - + job.setNumReduceTasks(work.getNumReduceTasks().intValue()); job.setReducerClass(ExecReducer.class); - + job.setInputFormat(org.apache.hadoop.hive.ql.io.HiveInputFormat.class); - - // No-Op - we don't really write anything here .. + + // No-Op - we don't really write anything here .. job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); - + String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS); if (StringUtils.isNotBlank(auxJars)) { LOG.info("adding libjars: " + auxJars); @@ -294,47 +319,62 @@ int returnVal = 0; FileSystem fs = null; RunningJob rj = null; - + try { fs = FileSystem.get(job); - + // if the input is empty exit gracefully Path[] inputPaths = FileInputFormat.getInputPaths(job); boolean emptyInput = true; for (Path inputP : inputPaths) { - if(!fs.exists(inputP)) + if (!fs.exists(inputP)) continue; - + FileStatus[] fStats = fs.listStatus(inputP); - for (FileStatus fStat:fStats) { - if (fStat.getLen() > 0) { - emptyInput = false; - break; - } + for (FileStatus fStat : fStats) { + if (fStat.getLen() > 0) { + emptyInput = false; + break; + } } } - + if (emptyInput) { console.printInfo("Job need not be submitted: no output: Success"); - return 0; + return 0; } + + inferNumReducers(); + - inferNumReducers(); + if (SessionState.get() != null) { + if (work.getReducer() != null) { + SessionState.get().getHiveHistory().setTaskProperty( + SessionState.get().getQueryId(), getId(), + Keys.TASK_NUM_REDUCERS, String.valueOf(work.getNumReduceTasks())); + } else { + SessionState.get().getHiveHistory().setTaskProperty( + SessionState.get().getQueryId(), getId(), + Keys.TASK_NUM_REDUCERS, String.valueOf(0)); + } + } JobClient jc = new JobClient(job); - + // make this client wait if job trcker is not behaving well. Throttle.checkJobTracker(job, LOG); rj = jc.submitJob(job); - // add to list of running jobs so in case of abnormal shutdown can kill it. - runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill"); + // add to list of running jobs so in case of abnormal shutdown can kill + // it. + runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + + "&action=kill"); jobInfo(rj); rj = jobProgress(jc, rj); String statusMesg = "Ended Job = " + rj.getJobID(); - if(!rj.isSuccessful()) { + if (!rj.isSuccessful()) { statusMesg += " with errors"; returnVal = 2; console.printError(statusMesg); @@ -343,45 +383,49 @@ } } catch (Exception e) { String mesg = " with exception '" + e.getMessage() + "'"; - if(rj != null) { + if (rj != null) { mesg = "Ended Job = " + rj.getJobID() + mesg; } else { mesg = "Job Submission failed" + mesg; } - // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils - console.printError(mesg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + console.printError(mesg, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); returnVal = 1; } finally { Utilities.clearMapRedWork(job); try { fs.delete(new Path(jobScratchDir), true); - if(returnVal != 0 && rj != null) { + if (returnVal != 0 && rj != null) { rj.killJob(); } runningJobKillURIs.remove(rj.getJobID()); - } catch (Exception e) {} + } catch (Exception e) { + } } return (returnVal); } - + private static void printUsage() { - System.out.println("ExecDriver -plan [-jobconf k1=v1 [-jobconf k2=v2] ...] "+ - "[-files [,] ...]"); + System.out + .println("ExecDriver -plan [-jobconf k1=v1 [-jobconf k2=v2] ...] " + + "[-files [,] ...]"); System.exit(1); } public static void main(String[] args) throws IOException, HiveException { String planFileName = null; - ArrayList jobConfArgs = new ArrayList (); + ArrayList jobConfArgs = new ArrayList(); boolean isSilent = false; String files = null; - try{ - for(int i=0; i jobInfoMap = new HashMap(); + + // Task Hash Map + private HashMap taskInfoMap = new HashMap(); + + public HiveHistoryViewer(String path) { + historyFile = path; + init(); + } + + public String getSessionId() { + return sessionId; + } + + public Map getJobInfoMap() { + return jobInfoMap; + } + + public Map getTaskInfoMap() { + return taskInfoMap; + } + + /** + * parse history files + */ + void init() { + + try { + HiveHistory.parseHiveHistory(historyFile, this); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + /** + * Implementation Listner interface function + * + * @see org.apache.hadoop.hive.ql.history.HiveHistory.Listener#handle(org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes, + * java.util.Map) + */ + public void handle(RecordTypes recType, Map values) { + + if (recType == RecordTypes.SessionStart) { + sessionId = values.get(Keys.SESSION_ID.name()); + } else if (recType == RecordTypes.QueryStart || recType == RecordTypes.QueryEnd) { + String key = values.get(Keys.QUERY_ID.name()); + QueryInfo ji; + if (jobInfoMap.containsKey(key)) { + ji = jobInfoMap.get(key); + + ji.hm.putAll(values); + + } else { + ji = new QueryInfo(); + ji.hm = new HashMap(); + ji.hm.putAll(values); + + jobInfoMap.put(key, ji); + + } + } else if (recType == RecordTypes.TaskStart + || recType == RecordTypes.TaskEnd + || recType == RecordTypes.TaskProgress) { + + String jobid = values.get(Keys.QUERY_ID.name()); + String taskid = values.get(Keys.TASK_ID.name()); + String key = jobid + ":" + taskid; + TaskInfo ti; + if (taskInfoMap.containsKey(key)) { + ti = taskInfoMap.get(key); + ti.hm.putAll(values); + } else { + ti = new TaskInfo(); + ti.hm = new HashMap(); + ti.hm.putAll(values); + taskInfoMap.put(key, ti); + + } + + } + + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (revision 0) @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.history; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.Counters.Group; + + +public class HiveHistory { + + PrintWriter histStream; // History File stream + + String histFileName; // History file name + + static final private Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory"); + + private LogHelper console; + + // Job Hash Map + private HashMap queryInfoMap = new HashMap(); + + // Task Hash Map + private HashMap taskInfoMap = new HashMap(); + + private static final String DELIMITER = " "; + + public static enum RecordTypes { + QueryStart, QueryEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd + }; + + public static enum Keys { + SESSION_ID, QUERY_ID, TASK_ID, QUERY_RET_CODE, QUERY_NUM_TASKS, QUERY_STRING, TIME, + TASK_RET_CODE, TASK_NAME, TASK_HADOOP_ID, TASK_HADOOP_PROGRESS, TASK_COUNTERS, TASK_NUM_REDUCERS + }; + + private static final String KEY = "(\\w+)"; + private static final String VALUE = "[[^\"]?]+"; // anything but a " in "" + + private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + + VALUE + "\""); + + // temp buffer for parsed dataa + private static Map parseBuffer = new HashMap(); + + /** + * Listner interface Parser will call handle function for each record type + */ + public static interface Listener { + + public void handle(RecordTypes recType, Map values) + throws IOException; + } + + /** + * Parses history file and calls call back functions + * + * @param path + * @param l + * @throws IOException + */ + public static void parseHiveHistory(String path, Listener l) + throws IOException { + FileInputStream fi = new FileInputStream(path); + BufferedReader reader = new BufferedReader(new InputStreamReader(fi)); + try { + String line = null; + StringBuffer buf = new StringBuffer(); + while ((line = reader.readLine()) != null) { + buf.append(line); + //if it does not end with " then it is line continuation + if (!line.trim().endsWith("\"")) { + continue; + } + parseLine(buf.toString(), l); + buf = new StringBuffer(); + } + } finally { + try { + reader.close(); + } catch (IOException ex) { + } + } + } + + /** + * Parse a single line of history. + * + * @param line + * @param l + * @throws IOException + */ + private static void parseLine(String line, Listener l) throws IOException { + // extract the record type + int idx = line.indexOf(' '); + String recType = line.substring(0, idx); + String data = line.substring(idx + 1, line.length()); + + Matcher matcher = pattern.matcher(data); + + while (matcher.find()) { + String tuple = matcher.group(0); + String[] parts = tuple.split("="); + + parseBuffer.put(parts[0], parts[1].substring(1, parts[1].length() - 1)); + } + + l.handle(RecordTypes.valueOf(recType), parseBuffer); + + parseBuffer.clear(); + } + + public static class Info { + + } + + public static class SessionInfo extends Info { + public String sessionId; + }; + + public static class QueryInfo extends Info { + public Map hm = new HashMap(); + }; + + public static class TaskInfo extends Info { + public Map hm = new HashMap(); + + }; + + /** + * Construct HiveHistory object an open history log file. + * + * @param ss + */ + public HiveHistory(SessionState ss) { + + try { + console = new LogHelper(LOG); + String conf_file_loc = ss.getConf().getVar( + HiveConf.ConfVars.HIVEHISTORYFILELOC); + if ((conf_file_loc == null) || conf_file_loc.length() == 0) + { + console.printError("No history file location given"); + return; + } + histFileName = conf_file_loc + "/hive_job_log_" + ss.getSessionId() + + ".txt"; + console.printInfo("Hive history file=" + histFileName); + histStream = new PrintWriter(histFileName); + + HashMap hm = new HashMap(); + hm.put(Keys.SESSION_ID.name(), ss.getSessionId()); + log(RecordTypes.SessionStart, hm); + } catch (FileNotFoundException e) { + console.printError("FAILED: Failed to open Query Log : " +histFileName+ " "+ e.getMessage(), "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + + } + + /** + * @return historyFileName + */ + public String getHistFileName() { + return histFileName; + } + + /** + * Write the a history record to history file + * + * @param rt + * @param keyValMap + */ + void log(RecordTypes rt, Map keyValMap) { + + if (histStream == null) + return; + + StringBuffer sb = new StringBuffer(); + sb.append(rt.name()); + + for (Map.Entry ent : keyValMap.entrySet()) { + + sb.append(DELIMITER); + String key = ent.getKey(); + String val = ent.getValue(); + val = val.replace('\n', ' '); + sb.append(key + "=\"" + val + "\""); + + } + sb.append(DELIMITER); + sb.append(Keys.TIME.name() + "=\"" + System.currentTimeMillis() + "\""); + histStream.println(sb); + histStream.flush(); + + } + + /** + * Called at the start of job Driver.run() + */ + public void startQuery(String cmd, String id) { + SessionState ss = SessionState.get(); + if (ss == null) + return; + QueryInfo ji = new QueryInfo(); + + ji.hm.put(Keys.QUERY_ID.name(), id); + ji.hm.put(Keys.QUERY_STRING.name(), cmd); + + queryInfoMap.put(id, ji); + + + + log(RecordTypes.QueryStart, ji.hm); + + } + + /** + * Used to set job status and other attributes of a job + * + * @param queryId + * @param propName + * @param propValue + */ + public void setQueryProperty(String queryId, Keys propName, String propValue) { + QueryInfo ji = queryInfoMap.get(queryId); + if (ji == null) + return; + ji.hm.put(propName.name(), propValue); + } + + /** + * Used to set task properties. + * + * @param taskId + * @param propName + * @param propValue + */ + public void setTaskProperty(String queryId, String taskId, Keys propName, + String propValue) { + String id = queryId + ":" + taskId; + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) + return; + ti.hm.put(propName.name(), propValue); + } + + /** + * Serialize the task counters and set as a task property. + * + * @param taskId + * @param rj + */ + public void setTaskCounters(String queryId, String taskId, RunningJob rj) { + String id = queryId + ":" + taskId; + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) + return; + StringBuilder sb = new StringBuilder(""); + try { + + boolean first = true; + for (Group group : rj.getCounters()) { + for (Counter counter : group) { + if (first) { + first = false; + } else { + sb.append(','); + } + sb.append(group.getDisplayName()); + sb.append('.'); + sb.append(counter.getDisplayName()); + sb.append(':'); + sb.append(counter.getCounter()); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString()); + } + + /** + * Called at the end of Job. A Job is sql query. + * + * @param queryId + */ + public void endQuery(String queryId) { + + QueryInfo ji = queryInfoMap.get(queryId); + if (ji == null) + return; + log(RecordTypes.QueryEnd, ji.hm); + } + + /** + * Called at the start of a task. Called by Driver.run() A Job can have + * multiple tasks. Tasks will have multiple operator. + * + * @param task + */ + public void startTask(String queryId, Task task, + String taskName) { + SessionState ss = SessionState.get(); + if (ss == null) + return; + TaskInfo ti = new TaskInfo(); + + ti.hm.put(Keys.QUERY_ID.name(), ss.getQueryId()); + ti.hm.put(Keys.TASK_ID.name(), task.getId()); + ti.hm.put(Keys.TASK_NAME.name(), taskName); + + String id = queryId + ":" + task.getId(); + taskInfoMap.put(id, ti); + + log(RecordTypes.TaskStart, ti.hm); + + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void endTask(String queryId, Task task) { + String id = queryId + ":" + task.getId(); + TaskInfo ti = taskInfoMap.get(id); + + if (ti == null) + return; + log(RecordTypes.TaskEnd, ti.hm); + } + + /** + * Called at the end of a task. + * + * @param task + */ + public void progressTask(String queryId, Task task) { + String id = queryId + ":" + task.getId(); + TaskInfo ti = taskInfoMap.get(id); + if (ti == null) + return; + log(RecordTypes.TaskProgress, ti.hm); + + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 734538) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.*; + import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.commons.lang.StringUtils; @@ -39,6 +40,8 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.history.HiveHistory; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.serde.ByteStream; import org.apache.hadoop.hive.conf.HiveConf; @@ -49,21 +52,21 @@ public class Driver implements CommandProcessor { static final private Log LOG = LogFactory.getLog("hive.ql.Driver"); - private int maxRows = 100; + private int maxRows = 100; ByteStream.Output bos = new ByteStream.Output(); - - private ParseDriver pd; - private HiveConf conf; - private DataInput resStream; - private LogHelper console; - private Context ctx; + + private ParseDriver pd; + private HiveConf conf; + private DataInput resStream; + private LogHelper console; + private Context ctx; private BaseSemanticAnalyzer sem; - + public int countJobs(List> tasks) { if (tasks == null) return 0; int jobs = 0; - for (Task task: tasks) { + for (Task task : tasks) { if (task.isMapRedTask()) { jobs++; } @@ -81,11 +84,12 @@ sem.setFetchTaskInit(true); sem.getFetchTask().initialize(conf); } - FetchTask ft = (FetchTask)sem.getFetchTask(); + FetchTask ft = (FetchTask) sem.getFetchTask(); tableDesc td = ft.getTblDesc(); String tableName = "result"; - List lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer()); + List lst = MetaStoreUtils.getFieldsFromDeserializer( + tableName, td.getDeserializer()); String schema = MetaStoreUtils.getDDLFromFieldSchema(tableName, lst); return schema; } @@ -111,7 +115,7 @@ return false; boolean hasReduce = false; - for (Task task: tasks) { + for (Task task : tasks) { if (task.hasReduce()) { return true; } @@ -121,10 +125,9 @@ return hasReduce; } - /** * for backwards compatibility with current tests - */ + */ public Driver(HiveConf conf) { console = new LogHelper(LOG); this.conf = conf; @@ -133,34 +136,55 @@ public Driver() { console = new LogHelper(LOG); - if(SessionState.get() != null) { + if (SessionState.get() != null) { conf = SessionState.get().getConf(); ctx = new Context(conf); } } + private String makeQueryId() { + GregorianCalendar gc = new GregorianCalendar(); + String userid = System.getProperty("user.name"); + + return userid + "_" + + String.format("%1$4d%2$02d%3$02d%4$02d%5$02d%5$02d", gc.get(Calendar.YEAR), + gc.get(Calendar.MONTH) + 1, + gc.get(Calendar.DAY_OF_MONTH), + gc.get(Calendar.HOUR_OF_DAY), + gc.get(Calendar.MINUTE), gc.get(Calendar.SECOND)); + } + + public int run(String command) { - boolean noName = StringUtils.isEmpty(conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME)); + boolean noName = StringUtils.isEmpty(conf + .getVar(HiveConf.ConfVars.HADOOPJOBNAME)); int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); int jobs = 0; - conf.setVar(HiveConf.ConfVars.HIVEQUERYID, command); + conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, command); + + String queryId = makeQueryId(); + conf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId); try { - + TaskFactory.resetId(); LOG.info("Starting command: " + command); ctx.clear(); ctx.makeScratchDir(); + + if (SessionState.get() != null) + SessionState.get().getHiveHistory().startQuery(command, conf.getVar(HiveConf.ConfVars.HIVEQUERYID) ); + resStream = null; - + pd = new ParseDriver(); ASTNode tree = pd.parse(command); - while((tree.getToken() == null) && (tree.getChildCount() > 0)) { - tree = (ASTNode)tree.getChild(0); + while ((tree.getToken() == null) && (tree.getChildCount() > 0)) { + tree = (ASTNode) tree.getChild(0); } sem = SemanticAnalyzerFactory.get(conf, tree); @@ -173,55 +197,71 @@ if (jobs > 0) { console.printInfo("Total MapReduce jobs = " + jobs); } - + if (SessionState.get() != null) + SessionState.get().getHiveHistory().setQueryProperty(queryId, + Keys.QUERY_NUM_TASKS, String.valueOf(jobs)); + boolean hasReduce = hasReduceTasks(sem.getRootTasks()); + if (hasReduce) { - console.printInfo("Number of reducers = " + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS)); + console.printInfo("Number of reducers = " + + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS)); console.printInfo("In order to change numer of reducers use:"); console.printInfo(" set mapred.reduce.tasks = "); } String jobname = Utilities.abbreviate(command, maxlen - 6); int curJob = 0; - for(Task rootTask: sem.getRootTasks()) { + for (Task rootTask : sem.getRootTasks()) { // assumption that only top level tasks are map-reduce tasks if (rootTask.isMapRedTask()) { - curJob ++; - if(noName) { - conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + curJob + "/" + jobs + ")"); + curJob++; + if (noName) { + conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + curJob + + "/" + jobs + ")"); } } rootTask.initialize(conf); } // A very simple runtime that keeps putting runnable takss - // on a list and when a job completes, it puts the children at the back of the list + // on a list and when a job completes, it puts the children at the back of + // the list // while taking the job to run from the front of the list Queue> runnable = new LinkedList>(); - for(Task rootTask:sem.getRootTasks()) { + for (Task rootTask : sem.getRootTasks()) { if (runnable.offer(rootTask) == false) { LOG.error("Could not insert the first task into the queue"); return (1); } } - while(runnable.peek() != null) { + while (runnable.peek() != null) { Task tsk = runnable.remove(); + if (SessionState.get() != null) + SessionState.get().getHiveHistory().startTask(queryId, tsk, + tsk.getClass().getName()); + int exitVal = tsk.execute(); + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setTaskProperty(queryId, + tsk.getId(), Keys.TASK_RET_CODE, String.valueOf(exitVal)); + SessionState.get().getHiveHistory().endTask(queryId, tsk); + } if (exitVal != 0) { - console.printError("FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName()); + console.printError("FAILED: Execution Error, return code " + exitVal + + " from " + tsk.getClass().getName()); return 9; } - tsk.setDone(); if (tsk.getChildTasks() == null) { continue; } - for(Task child: tsk.getChildTasks()) { + for (Task child : tsk.getChildTasks()) { // Check if the child is runnable if (!child.isRunnable()) { continue; @@ -232,51 +272,66 @@ } } } + if (SessionState.get() != null) + SessionState.get().getHiveHistory().setQueryProperty(queryId, + Keys.QUERY_RET_CODE, String.valueOf(0)); } catch (SemanticException e) { - console.printError("FAILED: Error in semantic analysis: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + if (SessionState.get() != null) + SessionState.get().getHiveHistory().setQueryProperty(queryId, + Keys.QUERY_RET_CODE, String.valueOf(10)); + console.printError("FAILED: Error in semantic analysis: " + + e.getMessage(), "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (10); } catch (ParseException e) { - console.printError("FAILED: Parse Error: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + if (SessionState.get() != null) + SessionState.get().getHiveHistory().setQueryProperty(queryId, + Keys.QUERY_RET_CODE, String.valueOf(11)); + console.printError("FAILED: Parse Error: " + e.getMessage(), "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (11); } catch (Exception e) { - // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils - console.printError("FAILED: Unknown exception : " + e.getMessage(), - "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + if (SessionState.get() != null) + SessionState.get().getHiveHistory().setQueryProperty(queryId, + Keys.QUERY_RET_CODE, String.valueOf(12)); + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + console.printError("FAILED: Unknown exception : " + e.getMessage(), "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); return (12); } finally { - if(noName) { + if (SessionState.get() != null) + SessionState.get().getHiveHistory().endQuery(queryId); + if (noName) { conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, ""); - } + } } console.printInfo("OK"); return (0); } - - - public boolean getResults(Vector res) - { + + public boolean getResults(Vector res) { if (sem != null && sem.getFetchTask() != null) { if (!sem.getFetchTaskInit()) { sem.setFetchTaskInit(true); sem.getFetchTask().initialize(conf); } - FetchTask ft = (FetchTask)sem.getFetchTask(); + FetchTask ft = (FetchTask) sem.getFetchTask(); ft.setMaxRows(maxRows); return ft.fetch(res); } if (resStream == null) resStream = ctx.getStream(); - if (resStream == null) return false; - + if (resStream == null) + return false; + int numRows = 0; String row = null; - while (numRows < maxRows) - { - if (resStream == null) - { + while (numRows < maxRows) { + if (resStream == null) { if (numRows > 0) return true; else @@ -285,8 +340,7 @@ bos.reset(); Utilities.streamStatus ss; - try - { + try { ss = Utilities.readColumn(resStream, bos); if (bos.getCount() > 0) row = new String(bos.getData(), 0, bos.getCount(), "UTF-8"); @@ -298,12 +352,13 @@ res.add(row); } } catch (IOException e) { - console.printError("FAILED: Unexpected IO exception : " + e.getMessage()); + console.printError("FAILED: Unexpected IO exception : " + + e.getMessage()); res = null; return false; } - if (ss == Utilities.streamStatus.EOF) + if (ss == Utilities.streamStatus.EOF) resStream = ctx.getStream(); } return true; @@ -314,14 +369,12 @@ // Delete the scratch directory from the context ctx.removeScratchDir(); ctx.clear(); + } catch (Exception e) { + console.printError("FAILED: Unknown exception : " + e.getMessage(), "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return (13); } - catch (Exception e) { - console.printError("FAILED: Unknown exception : " + e.getMessage(), - "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return(13); - } - - return(0); + + return (0); } } -