Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1042922) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -317,7 +317,8 @@ SEMANTIC_ANALYZER_HOOK("hive.semantic.analyzer.hook",null), // Print column names in output - HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false); + HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false), + HIVEQUERYTIMEOUT("hive.query.timeout", 7*24*60*60*1000), ; Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1043154) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -693,13 +693,17 @@ return new CommandProcessorResponse(ret, errorMessage, SQLState); } + SessionState ss = SessionState.get(); + ss.start_monitor(this, command, Thread.currentThread(), conf); ret = execute(); if (ret != 0) { releaseLocks(ctx.getHiveLocks()); + ss.remove_monitor(this); return new CommandProcessorResponse(ret, errorMessage, SQLState); } releaseLocks(ctx.getHiveLocks()); + ss.remove_monitor(this); return new CommandProcessorResponse(ret); } Index: ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (revision 1042922) +++ ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (working copy) @@ -54,22 +54,26 @@ } else if ("delete".equals(cmdl)) { return new DeleteResourceProcessor(); } else if (!isBlank(cmd)) { - if (conf == null) { - return new Driver(); - } - - Driver drv = mapDrivers.get(conf); - if (drv == null) { - drv = new Driver(); - mapDrivers.put(conf, drv); - } - drv.init(); - return drv; + return getDriver(conf); } return null; } + private static CommandProcessor getDriver(HiveConf conf) { + if (conf == null) { + return new Driver(); + } + + Driver drv = mapDrivers.get(conf); + if (drv == null) { + drv = new Driver(); + mapDrivers.put(conf, drv); + } + drv.init(); + return drv; + } + public static void clean(HiveConf conf) { Driver drv = mapDrivers.get(conf); if (drv != null) { Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 1042922) +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy) @@ -28,7 +28,9 @@ import java.util.GregorianCalendar; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; @@ -38,8 +40,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; +import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.util.DosToUnix; import org.apache.log4j.LogManager; import org.apache.log4j.PropertyConfigurator; @@ -545,4 +549,84 @@ public void setCommandType(String commandType) { this.commandType = commandType; } + + private static MonitorThread monitorThread = null; + private static Map processorMonitorCxt = null; + static { + monitorThread = new MonitorThread(); + processorMonitorCxt = new HashMap(); + monitorThread.start(); + } + + static class ThreadProcessorContext { + HiveConf conf; + long processorStartTime; + long processorKillTime; + Thread runningThread; + String command; + + public ThreadProcessorContext(Thread runningThread, HiveConf conf, String command) { + this.conf = conf; + this.processorStartTime = System.currentTimeMillis(); + this.processorKillTime = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVEQUERYTIMEOUT); + if (this.processorKillTime > 0) { + this.processorKillTime = this.processorStartTime + + this.processorKillTime; + } + this.runningThread = runningThread; + this.command = command; + } + } + + static class MonitorThread extends Thread { + public void run() { + while (true) { + // sleep 10 mins + try { + Thread.sleep(1000 * 60 * 10); + } catch (InterruptedException e) { + } + long currentTime = System.currentTimeMillis(); + synchronized (processorMonitorCxt) { + Iterator> iter = processorMonitorCxt + .entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry current = iter + .next(); + ThreadProcessorContext cxt = current.getValue(); + if (cxt.processorKillTime > 0 + && (currentTime > cxt.processorKillTime)) { + _console.printError("Trying to kill query: \n " + cxt.command); + Thread runningThread = cxt.runningThread; + try { + runningThread.interrupt(); + } catch (Exception e) { + _console.printError("Failed to kill: " + e.getMessage()); + continue; + } + iter.remove(); + } + } + if (processorMonitorCxt.size() == 0) { + System.exit(-1); + } + } + } + } + } + + public void start_monitor(CommandProcessor processor, String command, + Thread runningThread, HiveConf conf) { + ThreadProcessorContext cxt = new ThreadProcessorContext(runningThread, + conf, command); + synchronized (processorMonitorCxt) { + processorMonitorCxt.put(processor, cxt); + } + } + + public void remove_monitor(CommandProcessor processor) { + synchronized(processorMonitorCxt) { + processorMonitorCxt.remove(processor); + } + } }