Index: hwi/src/java/org/apache/hadoop/hive/hwi/HWIContextListener.java =================================================================== --- hwi/src/java/org/apache/hadoop/hive/hwi/HWIContextListener.java (revision 901519) +++ hwi/src/java/org/apache/hadoop/hive/hwi/HWIContextListener.java (working copy) @@ -7,43 +7,50 @@ import org.apache.commons.logging.LogFactory; /** - * After getting a contextInitialized event this component starts an - * instance of the HiveSessionManager. - * + * After getting a contextInitialized event this component starts an instance of + * the HiveSessionManager. + * */ public class HWIContextListener implements javax.servlet.ServletContextListener { - - protected static final Log l4j = LogFactory.getLog( HWIContextListener.class.getName() ); + + protected static final Log l4j = LogFactory.getLog(HWIContextListener.class + .getName()); + /** - * The Hive Web Interface manages multiple hive sessions. This event is used to - * start a Runnable, HiveSessionManager as a thread inside the servlet - * container. - * @param sce An event fired by the servlet context on startup - */ - public void contextInitialized(ServletContextEvent sce){ - ServletContext sc = sce.getServletContext(); - HWISessionManager hs = new HWISessionManager(); - l4j.debug("HWISessionManager created."); - Thread t = new Thread(hs); - t.start(); - l4j.debug("HWISessionManager thread started."); - sc.setAttribute("hs", hs); - l4j.debug("HWISessionManager placed in application context."); + * The Hive Web Interface manages multiple hive sessions. This event is used + * to start a Runnable, HiveSessionManager as a thread inside the servlet + * container. + * + * @param sce + * An event fired by the servlet context on startup + */ + public void contextInitialized(ServletContextEvent sce) { + ServletContext sc = sce.getServletContext(); + HWISessionManager hs = new HWISessionManager(); + l4j.debug("HWISessionManager created."); + Thread t = new Thread(hs); + t.start(); + l4j.debug("HWISessionManager thread started."); + sc.setAttribute("hs", hs); + l4j.debug("HWISessionManager placed in application context."); + } + + /** + * When the Hive Web Interface is closing we locate the Runnable + * HiveSessionManager and set it's internal goOn variable to false. This + * should allow the application to gracefully shutdown. + * + * @param sce + * An event fired by the servlet context on context shutdown + */ + public void contextDestroyed(ServletContextEvent sce) { + ServletContext sc = sce.getServletContext(); + HWISessionManager hs = (HWISessionManager) sc.getAttribute("hs"); + if (hs == null) { + l4j.error("HWISessionManager was not found in context"); + } else { + l4j.error("HWISessionManager goOn set to false. Shutting down."); + hs.setGoOn(false); } - /** - * When the Hive Web Interface is closing we locate the Runnable - * HiveSessionManager and set it's internal goOn variable to false. This - * should allow the application to gracefully shutdown. - * @param sce An event fired by the servlet context on context shutdown - */ - public void contextDestroyed(ServletContextEvent sce){ - ServletContext sc = sce.getServletContext(); - HWISessionManager hs = (HWISessionManager) sc.getAttribute("hs"); - if (hs ==null){ - l4j.error("HWISessionManager was not found in context"); - } else { - l4j.error("HWISessionManager goOn set to false. Shutting down."); - hs.setGoOn(false); - } - } + } } Index: hwi/src/java/org/apache/hadoop/hive/hwi/HWIException.java =================================================================== --- hwi/src/java/org/apache/hadoop/hive/hwi/HWIException.java (revision 901519) +++ hwi/src/java/org/apache/hadoop/hive/hwi/HWIException.java (working copy) @@ -2,25 +2,25 @@ public class HWIException extends Exception { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - public HWIException() { - super(); - } + public HWIException() { + super(); + } - /** Specify an error String with the Exception */ - public HWIException(String arg0) { - super(arg0); - } + /** Specify an error String with the Exception */ + public HWIException(String arg0) { + super(arg0); + } - /** Wrap an Exception in HWIException */ - public HWIException(Throwable arg0) { - super(arg0); - } + /** Wrap an Exception in HWIException */ + public HWIException(Throwable arg0) { + super(arg0); + } - /** Specify an error String and wrap an Exception in HWIException */ - public HWIException(String arg0, Throwable arg1) { - super(arg0, arg1); - } + /** Specify an error String and wrap an Exception in HWIException */ + public HWIException(String arg0, Throwable arg1) { + super(arg0, arg1); + } } Index: hwi/src/java/org/apache/hadoop/hive/hwi/HWISessionManager.java =================================================================== --- hwi/src/java/org/apache/hadoop/hive/hwi/HWISessionManager.java (revision 901519) +++ hwi/src/java/org/apache/hadoop/hive/hwi/HWISessionManager.java (working copy) @@ -17,11 +17,14 @@ */ package org.apache.hadoop.hive.hwi; -import java.util.*; +import java.util.Collection; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf; /** * HiveSessionManager is a Runnable started inside a web application context. @@ -33,187 +36,187 @@ */ public class HWISessionManager implements Runnable { - protected static final Log l4j = LogFactory.getLog(HWISessionManager.class - .getName()); + protected static final Log l4j = LogFactory.getLog(HWISessionManager.class + .getName()); - private boolean goOn; - private TreeMap> items; + private boolean goOn; + private TreeMap> items; - protected HWISessionManager() { - goOn = true; - items = new TreeMap>(); - } + protected HWISessionManager() { + goOn = true; + items = new TreeMap>(); + } - /** - * This method scans the SessionItem collection. If a SessionItem is in the - * QUERY_SET state that signals that its thread should be started. If the - * SessionItem is in the DESTROY state it should be cleaned up and removed - * from the collection. Currently we are using a sleep. A wait/notify could be - * implemented. Queries will run for a long time, a one second wait on start - * will not be noticed. - * - */ - public void run() { - l4j.debug("Entered run() thread has started"); - while (goOn) { - l4j.debug("locking items"); - synchronized (items) { + /** + * This method scans the SessionItem collection. If a SessionItem is in the + * QUERY_SET state that signals that its thread should be started. If the + * SessionItem is in the DESTROY state it should be cleaned up and removed + * from the collection. Currently we are using a sleep. A wait/notify could be + * implemented. Queries will run for a long time, a one second wait on start + * will not be noticed. + * + */ + public void run() { + l4j.debug("Entered run() thread has started"); + while (goOn) { + l4j.debug("locking items"); + synchronized (items) { - for (HWIAuth a : items.keySet()) { - for (HWISessionItem i : items.get(a)) { - if (i.getStatus() == HWISessionItem.WebSessionItemStatus.DESTROY) { - items.get(a).remove(i); - } - if (i.getStatus() == HWISessionItem.WebSessionItemStatus.KILL_QUERY) { - l4j.debug("Killing item: " + i.getSessionName()); - i.killIt(); - l4j.debug("Killed item: " + i.getSessionName()); - items.get(a).remove(i); - } - } - } + for (HWIAuth a : items.keySet()) { + for (HWISessionItem i : items.get(a)) { + if (i.getStatus() == HWISessionItem.WebSessionItemStatus.DESTROY) { + items.get(a).remove(i); + } + if (i.getStatus() == HWISessionItem.WebSessionItemStatus.KILL_QUERY) { + l4j.debug("Killing item: " + i.getSessionName()); + i.killIt(); + l4j.debug("Killed item: " + i.getSessionName()); + items.get(a).remove(i); + } + } + } - } // end sync - try { - Thread.sleep(100); - } catch (InterruptedException ex) { - l4j.error("Could not sleep ", ex); - } - } // end while - l4j.debug("goOn is false. Loop has ended."); - // Cleanup used here to stop all threads - synchronized (items) { - for (HWIAuth a : items.keySet()) { - for (HWISessionItem i : items.get(a)) { - try { - if (i.getStatus() == HWISessionItem.WebSessionItemStatus.QUERY_RUNNING) { - l4j.debug(i.getSessionName() + "Joining "); - i.runnable.join(1000); - l4j.debug(i.getSessionName() + "Joined "); - } - } catch (InterruptedException ex) { - l4j.error(i.getSessionName() + "while joining ", ex); - } - } - } - } - } // end run + } // end sync + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + l4j.error("Could not sleep ", ex); + } + } // end while + l4j.debug("goOn is false. Loop has ended."); + // Cleanup used here to stop all threads + synchronized (items) { + for (HWIAuth a : items.keySet()) { + for (HWISessionItem i : items.get(a)) { + try { + if (i.getStatus() == HWISessionItem.WebSessionItemStatus.QUERY_RUNNING) { + l4j.debug(i.getSessionName() + "Joining "); + i.runnable.join(1000); + l4j.debug(i.getSessionName() + "Joined "); + } + } catch (InterruptedException ex) { + l4j.error(i.getSessionName() + "while joining ", ex); + } + } + } + } + } // end run - protected boolean isGoOn() { - return goOn; - } + protected boolean isGoOn() { + return goOn; + } - protected void setGoOn(boolean goOn) { - this.goOn = goOn; - } + protected void setGoOn(boolean goOn) { + this.goOn = goOn; + } - protected TreeMap> getItems() { - return items; - } + protected TreeMap> getItems() { + return items; + } - protected void setItems(TreeMap> items) { - this.items = items; - } + protected void setItems(TreeMap> items) { + this.items = items; + } - // client methods called from JSP - /** - * Rather then return the actual items we return a list copies. This enforces - * our HWISessionManager by preventing the ability of the client(jsp) to - * create SessionItems. - * - * @return A set of SessionItems this framework manages - */ - public Vector findAllSessionItems() { - Vector otherItems = new Vector(); - for (HWIAuth a : items.keySet()) { - otherItems.addAll(items.get(a)); - } - return otherItems; - } + // client methods called from JSP + /** + * Rather then return the actual items we return a list copies. This enforces + * our HWISessionManager by preventing the ability of the client(jsp) to + * create SessionItems. + * + * @return A set of SessionItems this framework manages + */ + public Vector findAllSessionItems() { + Vector otherItems = new Vector(); + for (HWIAuth a : items.keySet()) { + otherItems.addAll(items.get(a)); + } + return otherItems; + } - /** - * Here we handle creating the SessionItem, we do this for the JSP client - * because we need to set parameters the client is not aware of. One such - * parameter is the command line arguments the server was started with. - * - * @param a - * Authenticated user - * @param sessionName - * Represents the session name - * @return a new SessionItem or null if a session with that name already - * exists - */ - public HWISessionItem createSession(HWIAuth a, String sessionName) { + /** + * Here we handle creating the SessionItem, we do this for the JSP client + * because we need to set parameters the client is not aware of. One such + * parameter is the command line arguments the server was started with. + * + * @param a + * Authenticated user + * @param sessionName + * Represents the session name + * @return a new SessionItem or null if a session with that name already + * exists + */ + public HWISessionItem createSession(HWIAuth a, String sessionName) { - l4j.debug("Creating session: " + sessionName); + l4j.debug("Creating session: " + sessionName); - HWISessionItem si = null; + HWISessionItem si = null; - synchronized (this.items) { - if (findSessionItemByName(a, sessionName) == null) { - l4j.debug("Initializing session: " + sessionName + " a for " - + a.getUser()); - si = new HWISessionItem(a,sessionName); + synchronized (items) { + if (findSessionItemByName(a, sessionName) == null) { + l4j.debug("Initializing session: " + sessionName + " a for " + + a.getUser()); + si = new HWISessionItem(a, sessionName); - if (!items.containsKey(a)) { - l4j.debug("SessionList is empty " + a.getUser()); - TreeSet list = new TreeSet(); - list.add(si); - items.put(a, list); - l4j.debug("Item added " + si.getSessionName() + " for user " - + a.getUser()); - } else { - items.get(a).add(si); - l4j.debug("Item added " + si.getSessionName() + " for user " - + a.getUser()); - } + if (!items.containsKey(a)) { + l4j.debug("SessionList is empty " + a.getUser()); + TreeSet list = new TreeSet(); + list.add(si); + items.put(a, list); + l4j.debug("Item added " + si.getSessionName() + " for user " + + a.getUser()); + } else { + items.get(a).add(si); + l4j.debug("Item added " + si.getSessionName() + " for user " + + a.getUser()); + } - } else { - l4j.debug("Creating session: " + sessionName + " already exists " - + a.getUser()); - } - } - return si; - } + } else { + l4j.debug("Creating session: " + sessionName + " already exists " + + a.getUser()); + } + } + return si; + } - /** - * Helper method useful when you know the session name you wish to reference. - * - * @param sessionname - * @return A SessionItem matching the sessionname or null if it does not - * exists - */ - public HWISessionItem findSessionItemByName(HWIAuth auth, String sessionname) { - Collection sessForUser = this.items.get(auth); - if (sessForUser == null) { - return null; - } - for (HWISessionItem si : sessForUser) { - if (si.getSessionName().equals(sessionname)) { - return si; - } - } - return null; - } + /** + * Helper method useful when you know the session name you wish to reference. + * + * @param sessionname + * @return A SessionItem matching the sessionname or null if it does not + * exists + */ + public HWISessionItem findSessionItemByName(HWIAuth auth, String sessionname) { + Collection sessForUser = items.get(auth); + if (sessForUser == null) { + return null; + } + for (HWISessionItem si : sessForUser) { + if (si.getSessionName().equals(sessionname)) { + return si; + } + } + return null; + } - /** - * Used to list all users that have at least one session - * - * @return keySet of items all users that have any sessions - */ - public Set findAllUsersWithSessions() { - return items.keySet(); - } + /** + * Used to list all users that have at least one session + * + * @return keySet of items all users that have any sessions + */ + public Set findAllUsersWithSessions() { + return items.keySet(); + } - /** - * Used to list all the sessions of a user - * - * @param auth - * the user being enquired about - * @return all the sessions of that user - */ - public Set findAllSessionsForUser(HWIAuth auth) { - return this.items.get(auth); - } + /** + * Used to list all the sessions of a user + * + * @param auth + * the user being enquired about + * @return all the sessions of that user + */ + public Set findAllSessionsForUser(HWIAuth auth) { + return items.get(auth); + } } Index: hwi/src/java/org/apache/hadoop/hive/hwi/HWIAuth.java =================================================================== --- hwi/src/java/org/apache/hadoop/hive/hwi/HWIAuth.java (revision 901519) +++ hwi/src/java/org/apache/hadoop/hive/hwi/HWIAuth.java (working copy) @@ -1,76 +1,87 @@ package org.apache.hadoop.hive.hwi; + /** - * Represents an authenticated user. This class is stored in the users session. It is also used - * as a key for the HiveSessionManager + * Represents an authenticated user. This class is stored in the users session. + * It is also used as a key for the HiveSessionManager */ -public class HWIAuth implements Comparable{ - private String user; - private String[] groups; +public class HWIAuth implements Comparable { + private String user; + private String[] groups; - public HWIAuth() { + public HWIAuth() { - } + } - public String getUser() { - return user; - } + public String getUser() { + return user; + } - public void setUser(String user) { - this.user = user; - } + public void setUser(String user) { + this.user = user; + } - public String[] getGroups() { - return groups; - } + public String[] getGroups() { + return groups; + } - public void setGroups(String[] groups) { - this.groups = groups; - } - /** - * HWIAuth is used in SortedSets(s) the compartTo method is required - * @return chained call to String.compareTo based on user property - */ - public int compareTo(Object obj){ - if (obj == null){ - return -1; - } - if (! (obj instanceof HWIAuth)){ - return -1; - } - HWIAuth o = (HWIAuth) obj; - return o.getUser().compareTo(this.user); - } + public void setGroups(String[] groups) { + this.groups = groups; + } - /** HWIAuth is used in Map(s) the hashCode method is required - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((user == null) ? 0 : user.hashCode()); - return result; - } + /** + * HWIAuth is used in SortedSets(s) the compartTo method is required + * + * @return chained call to String.compareTo based on user property + */ + public int compareTo(Object obj) { + if (obj == null) { + return -1; + } + if (!(obj instanceof HWIAuth)) { + return -1; + } + HWIAuth o = (HWIAuth) obj; + return o.getUser().compareTo(user); + } - /** - * HWIAuth is used in Map(s) the equals method is required - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (!(obj instanceof HWIAuth)) - return false; - HWIAuth other = (HWIAuth) obj; - if (user == null) { - if (other.user != null) - return false; - } else if (!user.equals(other.user)) - return false; - return true; - } - + /** + * HWIAuth is used in Map(s) the hashCode method is required + * + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((user == null) ? 0 : user.hashCode()); + return result; + } + + /** + * HWIAuth is used in Map(s) the equals method is required + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof HWIAuth)) { + return false; + } + HWIAuth other = (HWIAuth) obj; + if (user == null) { + if (other.user != null) { + return false; + } + } else if (!user.equals(other.user)) { + return false; + } + return true; + } + } \ No newline at end of file Index: hwi/src/java/org/apache/hadoop/hive/hwi/HWISessionItem.java =================================================================== --- hwi/src/java/org/apache/hadoop/hive/hwi/HWISessionItem.java (revision 901519) +++ hwi/src/java/org/apache/hadoop/hive/hwi/HWISessionItem.java (working copy) @@ -22,25 +22,21 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; -import java.util.Collection; -import java.util.Vector; import java.util.ArrayList; import java.util.List; +import java.util.Vector; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.cli.OptionsProcessor; -import org.apache.hadoop.hive.ql.processors.CommandProcessor; -import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.exec.ExecDriver; import org.apache.hadoop.hive.ql.history.HiveHistoryViewer; +import org.apache.hadoop.hive.ql.processors.CommandProcessor; +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.conf.*; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * HWISessionItem can be viewed as a wrapper for a Hive shell. With it the user * has a session on the web server rather then in a console window. @@ -48,85 +44,94 @@ */ public class HWISessionItem implements Runnable, Comparable { - protected static final Log l4j = LogFactory.getLog(HWISessionItem.class.getName()); + protected static final Log l4j = LogFactory.getLog(HWISessionItem.class + .getName()); - /** Represents the state a session item can be in. */ + /** Represents the state a session item can be in. */ public enum WebSessionItemStatus { NEW, READY, QUERY_SET, QUERY_RUNNING, DESTROY, KILL_QUERY }; /** The Web Interface sessionName this is used to identify the session */ - private String sessionName; - - /** Respresents the current status of the session. Used by components to determine state. - * Operations will throw exceptions if the item is not in the correct state. */ + private final String sessionName; + + /** + * Respresents the current status of the session. Used by components to + * determine state. Operations will throw exceptions if the item is not in the + * correct state. + */ private HWISessionItem.WebSessionItemStatus status; private CliSessionState ss; - + /** Standard out from the session will be written to this local file */ private String resultFile; - + /** Standard error from the session will be written to this local file */ private String errorFile; - /** The results from the Driver. This is used for storing the most result - results from the driver in memory */ - private Vector>resultBucket; + /** + * The results from the Driver. This is used for storing the most result + * results from the driver in memory + */ + private Vector> resultBucket; /** Limits the resultBucket to be no greater then this size */ private int resultBucketMaxSize; /** List of queries that this item should/has operated on */ - private List queries; + private List queries; /** status code results of queries */ private List queryRet; /** Reference to the configuration */ private HiveConf conf; - + /** User privileges */ private HWIAuth auth; public Thread runnable; - /** Threading SessionState issues require us to capture a reference to - * the hive history file and store it*/ + /** + * Threading SessionState issues require us to capture a reference to the hive + * history file and store it + */ private String historyFile; /** - * Creates an instance of WebSessionItem, sets status to NEW. - */ + * Creates an instance of WebSessionItem, sets status to NEW. + */ public HWISessionItem(HWIAuth auth, String sessionName) { - this.auth=auth; - this.sessionName=sessionName; + this.auth = auth; + this.sessionName = sessionName; l4j.debug("HWISessionItem created"); status = WebSessionItemStatus.NEW; queries = new ArrayList(); queryRet = new ArrayList(); resultBucket = new Vector>(); - resultBucketMaxSize=1000; + resultBucketMaxSize = 1000; runnable = new Thread(this); runnable.start(); - + l4j.debug("Wait for NEW->READY transition"); - synchronized (this.runnable){ - if (this.status != WebSessionItemStatus.READY) { - try { - this.runnable.wait(); - } catch (Exception ex) {} + synchronized (runnable) { + if (status != WebSessionItemStatus.READY) { + try { + runnable.wait(); + } catch (Exception ex) { + } } } l4j.debug("NEW->READY transition complete"); } /** - * This is the initialization process that is carried out for each - * SessionItem. The goal is to emulate the startup of CLIDriver. - */ + * This is the initialization process that is carried out for each + * SessionItem. The goal is to emulate the startup of CLIDriver. + */ private void itemInit() { - l4j.debug("HWISessionItem itemInit start " + this.getSessionName()); + l4j.debug("HWISessionItem itemInit start " + getSessionName()); OptionsProcessor oproc = new OptionsProcessor(); if (System.getProperty("hwi-args") != null) { @@ -140,293 +145,303 @@ ss = new CliSessionState(conf); SessionState.start(ss); queries.add("set hadoop.job.ugi=" + auth.getUser() + "," - + auth.getGroups()[0]); - queries.add("set user.name="+auth.getUser() ); + + auth.getGroups()[0]); + queries.add("set user.name=" + auth.getUser()); /* - * HiveHistoryFileName will not be accessible outside this thread. We must - * capture this now. - */ - this.historyFile = this.ss.get().getHiveHistory().getHistFileName(); - l4j.debug("HWISessionItem itemInit Complete " + this.getSessionName()); - this.status= WebSessionItemStatus.READY; + * HiveHistoryFileName will not be accessible outside this thread. We must + * capture this now. + */ + historyFile = ss.get().getHiveHistory().getHistFileName(); + l4j.debug("HWISessionItem itemInit Complete " + getSessionName()); + status = WebSessionItemStatus.READY; - synchronized (this.runnable){ - this.runnable.notifyAll(); + synchronized (runnable) { + runnable.notifyAll(); } } /** - * HWISessionItem is a Runnable instance. Calling this method will change the - * status to QUERY_SET and notify(). The run method detects this and then - * continues processing. - */ + * HWISessionItem is a Runnable instance. Calling this method will change the + * status to QUERY_SET and notify(). The run method detects this and then + * continues processing. + */ public void clientStart() throws HWIException { - if (this.status == WebSessionItemStatus.QUERY_RUNNING) { + if (status == WebSessionItemStatus.QUERY_RUNNING) { throw new HWIException("Query already running"); } - this.status = WebSessionItemStatus.QUERY_SET; - synchronized (this.runnable) { - this.runnable.notifyAll(); + status = WebSessionItemStatus.QUERY_SET; + synchronized (runnable) { + runnable.notifyAll(); } - l4j.debug(this.getSessionName() + " Query is set to start"); + l4j.debug(getSessionName() + " Query is set to start"); } public void clientKill() throws HWIException { - if (this.status != WebSessionItemStatus.QUERY_RUNNING) { + if (status != WebSessionItemStatus.QUERY_RUNNING) { throw new HWIException("Can not kill that which is not running."); } - this.status = WebSessionItemStatus.KILL_QUERY; - l4j.debug(this.getSessionName() + " Query is set to KILL_QUERY"); + status = WebSessionItemStatus.KILL_QUERY; + l4j.debug(getSessionName() + " Query is set to KILL_QUERY"); } /** This method clears the private member variables. */ public void clientRenew() throws HWIException { throwIfRunning(); - this.queries = new ArrayList(); - this.queryRet = new ArrayList(); - this.resultBucket = new Vector>(); - this.resultFile = null; - this.errorFile = null; - //this.conf = null; - //this.ss = null; - this.status = WebSessionItemStatus.NEW; - l4j.debug(this.getSessionName() + " Query is renewed to start"); + queries = new ArrayList(); + queryRet = new ArrayList(); + resultBucket = new Vector>(); + resultFile = null; + errorFile = null; + // this.conf = null; + // this.ss = null; + status = WebSessionItemStatus.NEW; + l4j.debug(getSessionName() + " Query is renewed to start"); } - /** - * This is a callback style function used by the HiveSessionManager. The - * HiveSessionManager notices this and attempts to stop the query. - */ - protected void killIt() { - l4j.debug(this.getSessionName() + " Attempting kill."); - if (this.runnable != null) { - try { - this.runnable.join(1000); - l4j.debug(this.getSessionName() + " Thread join complete"); - } catch (InterruptedException e) { - l4j.error(this.getSessionName() + " killing session caused exception ", - e); - } - } - } + /** + * This is a callback style function used by the HiveSessionManager. The + * HiveSessionManager notices this and attempts to stop the query. + */ + protected void killIt() { + l4j.debug(getSessionName() + " Attempting kill."); + if (runnable != null) { + try { + runnable.join(1000); + l4j.debug(getSessionName() + " Thread join complete"); + } catch (InterruptedException e) { + l4j.error(getSessionName() + " killing session caused exception ", e); + } + } + } - /** - * Helper function to get configuration variables - * - * @param wanted - * a ConfVar - * @return Value of the configuration variable. - */ - public String getHiveConfVar(HiveConf.ConfVars wanted) throws HWIException { - String result = null; - try { - result = this.ss.getConf().getVar(wanted); - } catch (Exception ex) { - throw new HWIException(ex); - } - return result; - } - - public String getHiveConfVar(String s) throws HWIException{ - String result=null; - try { - result = conf.get(s); - } catch (Exception ex) { - throw new HWIException(ex); - } - return result; - } - /* - * mapred.job.tracker could be host:port or just local - * mapred.job.tracker.http.address could be host:port or just host - * In some configurations http.address is set to 0.0.0.0 we are combining the two - * variables to provide a url to the job tracker WUI if it exists. If hadoop chose - * the first available port for the JobTracker HTTP port will can not determine it. - */ - public String getJobTrackerURL(String jobid) throws HWIException{ - String jt = this.getHiveConfVar( "mapred.job.tracker" ); - String jth = this.getHiveConfVar( "mapred.job.tracker.http.address" ); - String [] jtparts = null; - String [] jthttpParts = null; - if (jt.equalsIgnoreCase("local")){ - jtparts = new String [2]; - jtparts [0]="local"; - jtparts [1]=""; - } else { - jtparts = jt.split(":"); - } - if (jth.contains(":")) { - jthttpParts = jth.split(":"); - } else { - jthttpParts = new String [2]; - jthttpParts [0] = jth; - jthttpParts [1] = ""; - } - return jtparts[0]+":"+jthttpParts[1]+"/jobdetails.jsp?jobid="+jobid+"&refresh=30"; - } + /** + * Helper function to get configuration variables + * + * @param wanted + * a ConfVar + * @return Value of the configuration variable. + */ + public String getHiveConfVar(HiveConf.ConfVars wanted) throws HWIException { + String result = null; + try { + result = ss.getConf().getVar(wanted); + } catch (Exception ex) { + throw new HWIException(ex); + } + return result; + } + + public String getHiveConfVar(String s) throws HWIException { + String result = null; + try { + result = conf.get(s); + } catch (Exception ex) { + throw new HWIException(ex); + } + return result; + } + + /* + * mapred.job.tracker could be host:port or just local + * mapred.job.tracker.http.address could be host:port or just host In some + * configurations http.address is set to 0.0.0.0 we are combining the two + * variables to provide a url to the job tracker WUI if it exists. If hadoop + * chose the first available port for the JobTracker HTTP port will can not + * determine it. + */ + public String getJobTrackerURL(String jobid) throws HWIException { + String jt = this.getHiveConfVar("mapred.job.tracker"); + String jth = this.getHiveConfVar("mapred.job.tracker.http.address"); + String[] jtparts = null; + String[] jthttpParts = null; + if (jt.equalsIgnoreCase("local")) { + jtparts = new String[2]; + jtparts[0] = "local"; + jtparts[1] = ""; + } else { + jtparts = jt.split(":"); + } + if (jth.contains(":")) { + jthttpParts = jth.split(":"); + } else { + jthttpParts = new String[2]; + jthttpParts[0] = jth; + jthttpParts[1] = ""; + } + return jtparts[0] + ":" + jthttpParts[1] + "/jobdetails.jsp?jobid=" + jobid + + "&refresh=30"; + } + @Override /* - * HWISessionItem uses a wait() notify() system. If the thread detects conf to - * be null, control is transfered to initItem(). A status of QUERY_SET causes - * control to transfer to the runQuery() method. DESTROY will cause the run - * loop to end permanently. - */ + * HWISessionItem uses a wait() notify() system. If the thread detects conf to + * be null, control is transfered to initItem(). A status of QUERY_SET causes + * control to transfer to the runQuery() method. DESTROY will cause the run + * loop to end permanently. + */ public void run() { - synchronized (this.runnable) { - while (this.status != HWISessionItem.WebSessionItemStatus.DESTROY) { - if (this.status == WebSessionItemStatus.NEW) { - this.itemInit(); - } - - if (this.status == WebSessionItemStatus.QUERY_SET) { - this.runQuery(); - } - - try { - this.runnable.wait(); - } catch (InterruptedException e) { - l4j.error("in wait() state ", e); - } - } //end while - } //end sync - } //end run + synchronized (runnable) { + while (status != HWISessionItem.WebSessionItemStatus.DESTROY) { + if (status == WebSessionItemStatus.NEW) { + itemInit(); + } + if (status == WebSessionItemStatus.QUERY_SET) { + runQuery(); + } + + try { + runnable.wait(); + } catch (InterruptedException e) { + l4j.error("in wait() state ", e); + } + } // end while + } // end sync + } // end run + /** - runQuery iterates the list of queries executing each query. - */ + * runQuery iterates the list of queries executing each query. + */ public void runQuery() { FileOutputStream fos = null; - if (this.getResultFile() != null) { + if (getResultFile() != null) { try { - fos = new FileOutputStream(new File(this.resultFile)); - ss.out = new PrintStream(fos, true, "UTF-8"); + fos = new FileOutputStream(new File(resultFile)); + ss.out = new PrintStream(fos, true, "UTF-8"); } catch (java.io.FileNotFoundException fex) { - l4j.error(this.getSessionName() + " opening resultfile " - + this.resultFile, fex); + l4j.error(getSessionName() + " opening resultfile " + resultFile, fex); } catch (java.io.UnsupportedEncodingException uex) { - l4j.error(this.getSessionName() + " opening resultfile " - + this.resultFile, uex); + l4j.error(getSessionName() + " opening resultfile " + resultFile, uex); } } else { - l4j.debug(this.getSessionName() + " Output file was not specified"); + l4j.debug(getSessionName() + " Output file was not specified"); } - l4j.debug(this.getSessionName() + " state is now QUERY_RUNNING."); - this.status = WebSessionItemStatus.QUERY_RUNNING; + l4j.debug(getSessionName() + " state is now QUERY_RUNNING."); + status = WebSessionItemStatus.QUERY_RUNNING; - //expect one return per query - queryRet = new ArrayList ( this.queries.size() ); - for (int i=0;i(queries.size()); + for (int i = 0; i < queries.size(); i++) { String cmd = queries.get(i); String cmd_trimmed = cmd.trim(); String[] tokens = cmd_trimmed.split("\\s+"); String cmd_1 = cmd_trimmed.substring(tokens[0].length()).trim(); CommandProcessor proc = CommandProcessorFactory.get(tokens[0]); - if (proc !=null){ + if (proc != null) { if (proc instanceof Driver) { Driver qp = (Driver) proc; - queryRet.add ( new Integer(qp.run(cmd))); + queryRet.add(new Integer(qp.run(cmd))); Vector res = new Vector(); try { while (qp.getResults(res)) { - resultBucket.add(res); - if (resultBucket.size() > resultBucketMaxSize) - resultBucket.remove(0); + resultBucket.add(res); + if (resultBucket.size() > resultBucketMaxSize) { + resultBucket.remove(0); + } for (String row : res) { - if (ss != null) { - if (ss.out != null) - ss.out.println(row); - } else { - throw new RuntimeException ("ss was null" ); - } - } -// res.clear(); + if (ss != null) { + if (ss.out != null) { + ss.out.println(row); + } + } else { + throw new RuntimeException("ss was null"); + } + } + // res.clear(); } } catch (IOException ex) { - l4j.error(this.getSessionName() + " getting results " - + this.getResultFile() + " caused exception.", ex); + l4j.error(getSessionName() + " getting results " + getResultFile() + + " caused exception.", ex); } qp.close(); } else { - queryRet.add( new Integer(proc.run(cmd_1) ) ); + queryRet.add(new Integer(proc.run(cmd_1))); } } else { - //processor was null - l4j.error(this.getSessionName() + - " query processor was not found for query "+ cmd ); - } + // processor was null + l4j.error(getSessionName() + + " query processor was not found for query " + cmd); + } } // end for - //cleanup + // cleanup try { if (fos != null) { fos.close(); } } catch (IOException ex) { - l4j.error(this.getSessionName() + " closing result file " - + this.getResultFile() + " caused exception.", ex); + l4j.error(getSessionName() + " closing result file " + getResultFile() + + " caused exception.", ex); } - this.status = WebSessionItemStatus.READY; - l4j.debug(this.getSessionName() + " state is now READY"); - synchronized (this.runnable){ - this.runnable.notifyAll(); + status = WebSessionItemStatus.READY; + l4j.debug(getSessionName() + " state is now READY"); + synchronized (runnable) { + runnable.notifyAll(); } } - /** - * This is a chained call to SessionState.setIsSilent(). Use this if you do - * not want the result file to have information status - */ + /** + * This is a chained call to SessionState.setIsSilent(). Use this if you do + * not want the result file to have information status + */ public void setSSIsSilent(boolean silent) throws HWIException { - if (ss == null) + if (ss == null) { throw new HWIException("Session State is null"); - this.ss.setIsSilent(silent); - } + } + ss.setIsSilent(silent); + } - /** - * This is a chained call to SessionState.getIsSilent() - */ - public boolean getSSIsSilent() throws HWIException { - if (ss == null) - throw new HWIException("Session State is null"); - return ss.getIsSilent(); - } + /** + * This is a chained call to SessionState.getIsSilent() + */ + public boolean getSSIsSilent() throws HWIException { + if (ss == null) { + throw new HWIException("Session State is null"); + } + return ss.getIsSilent(); + } - /** to support sorting/Set*/ + /** to support sorting/Set */ public int compareTo(HWISessionItem other) { - if (other == null) + if (other == null) { return -1; - return this.getSessionName().compareTo(other.getSessionName()); + } + return getSessionName().compareTo(other.getSessionName()); } /** - * - * @return the HiveHistoryViewer for the session - * @throws HWIException - */ + * + * @return the HiveHistoryViewer for the session + * @throws HWIException + */ public HiveHistoryViewer getHistoryViewer() throws HWIException { - if (ss == null) + if (ss == null) { throw new HWIException("Session state was null"); + } /* - * we can not call this.ss.get().getHiveHistory().getHistFileName() directly - * as this call is made from a a Jetty thread and will return null - */ - HiveHistoryViewer hv = new HiveHistoryViewer(this.historyFile); + * we can not call this.ss.get().getHiveHistory().getHistFileName() directly + * as this call is made from a a Jetty thread and will return null + */ + HiveHistoryViewer hv = new HiveHistoryViewer(historyFile); return hv; } /** - * Uses the sessionName property to compare to sessions - * - * @return true if sessionNames are equal false otherwise - */ + * Uses the sessionName property to compare to sessions + * + * @return true if sessionNames are equal false otherwise + */ + @Override public boolean equals(Object other) { - if (other == null) + if (other == null) { return false; - if (!(other instanceof HWISessionItem)) + } + if (!(other instanceof HWISessionItem)) { return false; + } HWISessionItem o = (HWISessionItem) other; - if (this.getSessionName().equals(o.getSessionName())) { + if (getSessionName().equals(o.getSessionName())) { return true; } else { return false; @@ -442,114 +457,126 @@ } /** - * The session name is an identifier to recognize the session - * - * @return the session's name - */ + * The session name is an identifier to recognize the session + * + * @return the session's name + */ public String getSessionName() { return sessionName; } /** - * Used to represent to the user and other components what state the - * HWISessionItem is in. Certain commands can only be run when the application - * is in certain states. - * - * @return the current status of the session - */ + * Used to represent to the user and other components what state the + * HWISessionItem is in. Certain commands can only be run when the application + * is in certain states. + * + * @return the current status of the session + */ public WebSessionItemStatus getStatus() { return status; } /** - * Currently unused - * - * @return a String with the full path to the error file. - */ + * Currently unused + * + * @return a String with the full path to the error file. + */ public String getErrorFile() { return errorFile; } - /** - * Currently unused - * - * @param errorFile - * the full path to the file for results. - */ + /** + * Currently unused + * + * @param errorFile + * the full path to the file for results. + */ public void setErrorFile(String errorFile) { this.errorFile = errorFile; } /** - * @return the auth - */ + * @return the auth + */ public HWIAuth getAuth() { return auth; } /** - * @param auth the auth to set - */ + * @param auth + * the auth to set + */ protected void setAuth(HWIAuth auth) { this.auth = auth; } /** returns an unmodifiable list of queries */ - public List getQueries(){ - return java.util.Collections.unmodifiableList(this.queries); + public List getQueries() { + return java.util.Collections.unmodifiableList(queries); } - - /** adds a new query to the execution list - @param query query to be added to the list*/ + + /** + * adds a new query to the execution list + * + * @param query + * query to be added to the list + */ public void addQuery(String query) throws HWIException { throwIfRunning(); - this.queries.add(query); + queries.add(query); } - /** removes a query from the execution list - * @param item the 0 based index of the item to be removed - */ + /** + * removes a query from the execution list + * + * @param item + * the 0 based index of the item to be removed + */ public void removeQuery(int item) throws HWIException { throwIfRunning(); - queries.remove(item); + queries.remove(item); } - + public void clearQueries() throws HWIException { throwIfRunning(); queries.clear(); } /** returns the value for resultBucketMaxSize */ - public int getResultBucketMaxSize(){ + public int getResultBucketMaxSize() { return resultBucketMaxSize; } - /** sets the value for resultBucketMaxSize - @param size the new size - */ - public void setResultBucketMaxSize(int size){ - resultBucketMaxSize=size; + /** + * sets the value for resultBucketMaxSize + * + * @param size + * the new size + */ + public void setResultBucketMaxSize(int size) { + resultBucketMaxSize = size; } /** gets the value for resultBucket */ - public Vector> getResultBucket(){ + public Vector> getResultBucket() { return resultBucket; } /** - * The HWISessionItem stores the result of each query in an array - * @return unmodifiable list of return codes - */ - public List getQueryRet(){ + * The HWISessionItem stores the result of each query in an array + * + * @return unmodifiable list of return codes + */ + public List getQueryRet() { return java.util.Collections.unmodifiableList(queryRet); } /** - * If the ItemStatus is QueryRunning most of the configuration - * is in a read only state. - */ + * If the ItemStatus is QueryRunning most of the configuration is in a read + * only state. + */ private void throwIfRunning() throws HWIException { - if (this.status == WebSessionItemStatus.QUERY_RUNNING) { + if (status == WebSessionItemStatus.QUERY_RUNNING) { throw new HWIException("Query already running"); } } Index: hwi/src/java/org/apache/hadoop/hive/hwi/HWIServer.java =================================================================== --- hwi/src/java/org/apache/hadoop/hive/hwi/HWIServer.java (revision 901519) +++ hwi/src/java/org/apache/hadoop/hive/hwi/HWIServer.java (working copy) @@ -6,119 +6,123 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.mortbay.jetty.Server; - import org.apache.hadoop.hive.shims.JettyShims; import org.apache.hadoop.hive.shims.ShimLoader; /** - * This is the entry point for HWI. A web server is invoked in the same manner as the hive CLI. - * Rather then opening a command line session a web server is started and a web application to work with - * hive is started. + * This is the entry point for HWI. A web server is invoked in the same manner + * as the hive CLI. Rather then opening a command line session a web server is + * started and a web application to work with hive is started. */ public class HWIServer { - protected static final Log l4j = LogFactory.getLog( HWIServer.class.getName() ); - - private JettyShims.Server webServer; - private String [] args; - - /** - * - * @param args These are the command line arguments. Usually -hiveconf. - * @throws java.io.IOException - */ - public HWIServer(String [] args) throws IOException { - this.args = args; - } - /** - * This method initialized the internal Jetty Servlet Engine. It adds the hwi - * context path. - * @throws java.io.IOException Port already in use, bad bind etc. - */ - public void start() throws IOException { - - HiveConf conf = new HiveConf(this.getClass()); - - String listen = null; - int port = -1; - - listen = conf.getVar(HiveConf.ConfVars.HIVEHWILISTENHOST); - port = conf.getIntVar(HiveConf.ConfVars.HIVEHWILISTENPORT); - - if (listen.equals("")){ - l4j.warn("hive.hwi.listen.host was not specified defaulting to 0.0.0.0"); - listen="0.0.0.0"; - } - if (port ==-1){ - l4j.warn("hive.hwi.listen.port was not specified defaulting to 9999"); - port=9999; - } + protected static final Log l4j = LogFactory.getLog(HWIServer.class.getName()); + private JettyShims.Server webServer; + private final String[] args; - - String hwiWAR = conf.getVar(HiveConf.ConfVars.HIVEHWIWARFILE); - String hivehome = System.getenv().get("HIVE_HOME"); - File hwiWARFile = new File(hivehome,hwiWAR); - if (! hwiWARFile.exists() ){ - l4j.fatal("HWI WAR file not found at "+ hwiWAR ); - System.exit(1); - } - - - webServer = ShimLoader.getJettyShims().startServer(listen, port); - webServer.addWar(hwiWARFile.toString(), "/hwi"); - - /*The command line args may be used by multiple components. Rather by setting - * these as a system property we avoid having to specifically pass them - */ - StringBuffer sb = new StringBuffer(); - for (int i=0;i