Index: common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/HiveInterruptCallback.java (revision 0) @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +public interface HiveInterruptCallback { + /** + * Request interrupting of the processing + */ + void interrupt(); +} Index: common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java (revision 0) +++ common/src/java/org/apache/hadoop/hive/common/HiveInterruptUtils.java (revision 0) @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import java.util.ArrayList; +import java.util.List; + +public class HiveInterruptUtils { + + /** + * A list of currently running comments that needs cleanup when the command is canceled + */ + private static List interruptCallbacks = new ArrayList(); + + public static HiveInterruptCallback add(HiveInterruptCallback command) { + synchronized (interruptCallbacks) { + interruptCallbacks.add(command); + } + return command; + } + + public static HiveInterruptCallback remove(HiveInterruptCallback command) { + synchronized (interruptCallbacks) { + interruptCallbacks.remove(command); + } + return command; + } + + /** + * Request interruption of current hive command + */ + public static void interrupt() { + synchronized (interruptCallbacks) { + for (HiveInterruptCallback resource : new ArrayList(interruptCallbacks)) { + resource.interrupt(); + } + } + } + + /** + * Checks if the current thread has been interrupted and throws RuntimeException is it has. + */ + public static void checkInterrupted() { + if (Thread.currentThread().isInterrupted()) { + InterruptedException interrupt = null; + try { + Thread.sleep(0); + } catch (InterruptedException e) { + interrupt = e; + } + throw new RuntimeException("Interuppted", interrupt); + } + } +} Index: cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (revision 1104622) +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (working copy) @@ -43,12 +43,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.HadoopJobExecHelper; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; import org.apache.hadoop.hive.ql.parse.ParseDriver; @@ -61,6 +63,10 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.thrift.TException; +import sun.misc.Signal; +import sun.misc.SignalHandler; + + /** * CliDriver. * @@ -155,48 +161,48 @@ } } } else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server - HiveClient client = ss.getClient(); - PrintStream out = ss.out; - PrintStream err = ss.err; + HiveClient client = ss.getClient(); + PrintStream out = ss.out; + PrintStream err = ss.err; + try { + client.execute(cmd_trimmed); + List results; + do { + results = client.fetchN(LINES_TO_FETCH); + for (String line : results) { + out.println(line); + } + } while (results.size() == LINES_TO_FETCH); + } catch (HiveServerException e) { + ret = e.getErrorCode(); + if (ret != 0) { // OK if ret == 0 -- reached the EOF + String errMsg = e.getMessage(); + if (errMsg == null) { + errMsg = e.toString(); + } + ret = e.getErrorCode(); + err.println("[Hive Error]: " + errMsg); + } + } catch (TException e) { + String errMsg = e.getMessage(); + if (errMsg == null) { + errMsg = e.toString(); + } + ret = -10002; + err.println("[Thrift Error]: " + errMsg); + } finally { try { - client.execute(cmd_trimmed); - List results; - do { - results = client.fetchN(LINES_TO_FETCH); - for (String line: results) { - out.println(line); - } - } while (results.size() == LINES_TO_FETCH); - } catch (HiveServerException e) { - ret = e.getErrorCode(); - if (ret != 0) { // OK if ret == 0 -- reached the EOF - String errMsg = e.getMessage(); - if (errMsg == null) { - errMsg = e.toString(); - } - ret = e.getErrorCode(); - err.println("[Hive Error]: " + errMsg); - } + client.clean(); } catch (TException e) { String errMsg = e.getMessage(); if (errMsg == null) { errMsg = e.toString(); } - ret = -10002; - err.println("[Thrift Error]: " + errMsg); - } finally { - try { - client.clean(); - } catch (TException e) { - String errMsg = e.getMessage(); - if (errMsg == null) { - errMsg = e.toString(); - } - err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: " - + errMsg); - } + err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: " + + errMsg); } + } } else { // local mode CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf)conf); int tryCount = 0; @@ -284,32 +290,88 @@ } public int processLine(String line) { - int lastRet = 0, ret = 0; + return processLine(line, false); + } - String command = ""; - for (String oneCmd : line.split(";")) { + /** + * Processes a line of semicolon separated commands + * + * @param line + * The commands to process + * @param allowInterupting + * When true the function will handle SIG_INT (Ctrl+C) by interrupting the processing and + * returning -1 + * @return + */ + public int processLine(String line, boolean allowInterupting) { + SignalHandler oldSignal = null; + Signal interupSignal = null; - if (StringUtils.endsWith(oneCmd, "\\")) { - command += StringUtils.chop(oneCmd) + ";"; - continue; - } else { - command += oneCmd; + if (allowInterupting) { + // Remember all threads that were running at the time we started line processing. + // Hook up the custom Ctrl+C handler while processing this line + interupSignal = new Signal("INT"); + oldSignal = Signal.handle(interupSignal, new SignalHandler() { + private final Thread cliThread = Thread.currentThread(); + private boolean interruptRequested; + + @Override + public void handle(Signal signal) { + boolean initialRequest = !interruptRequested; + interruptRequested = true; + + // Kill the VM on second ctrl+c + if (!initialRequest) { + console.printInfo("Exiting the JVM"); + System.exit(127); + } + + // Interrupt the CLI thread to stop the current statement and return + // to prompt + console.printInfo("Interrupting... Be patient, this might take some time."); + console.printInfo("Press Ctrl+C again to kill JVM"); + + // First, kill any running MR jobs + HadoopJobExecHelper.killRunningJobs(); + HiveInterruptUtils.interrupt(); + this.cliThread.interrupt(); + } + }); + } + + try { + int lastRet = 0, ret = 0; + + String command = ""; + for (String oneCmd : line.split(";")) { + + if (StringUtils.endsWith(oneCmd, "\\")) { + command += StringUtils.chop(oneCmd) + ";"; + continue; + } else { + command += oneCmd; + } + if (StringUtils.isBlank(command)) { + continue; + } + + ret = processCmd(command); + command = ""; + lastRet = ret; + boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS); + if (ret != 0 && !ignoreErrors) { + CommandProcessorFactory.clean((HiveConf) conf); + return ret; + } } - if (StringUtils.isBlank(command)) { - continue; + CommandProcessorFactory.clean((HiveConf) conf); + return lastRet; + } finally { + // Once we are done processing the line, restore the old handler + if (oldSignal != null && interupSignal != null) { + Signal.handle(interupSignal, oldSignal); } - - ret = processCmd(command); - command = ""; - lastRet = ret; - boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS); - if (ret != 0 && !ignoreErrors) { - CommandProcessorFactory.clean((HiveConf)conf); - return ret; - } } - CommandProcessorFactory.clean((HiveConf)conf); - return lastRet; } public int processReader(BufferedReader r) throws IOException { @@ -528,7 +590,7 @@ } if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) { line = prefix + line; - ret = cli.processLine(line); + ret = cli.processLine(line, true); prefix = ""; curPrompt = prompt; } else { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1104622) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -158,29 +158,33 @@ Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - synchronized (runningJobKillURIs) { - for (String uri : runningJobKillURIs.values()) { - try { - System.err.println("killing job with: " + uri); - java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri) - .openConnection(); - conn.setRequestMethod("POST"); - int retCode = conn.getResponseCode(); - if (retCode != 200) { - System.err.println("Got an error trying to kill job with URI: " + uri + " = " - + retCode); - } - } catch (Exception e) { - System.err.println("trying to kill job, caught: " + e); - // do nothing - } - } - } + killRunningJobs(); } }); } } - + + public static void killRunningJobs() { + synchronized (runningJobKillURIs) { + for (String uri : runningJobKillURIs.values()) { + try { + System.err.println("killing job with: " + uri); + java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri) + .openConnection(); + conn.setRequestMethod("POST"); + int retCode = conn.getResponseCode(); + if (retCode != 200) { + System.err.println("Got an error trying to kill job with URI: " + uri + " = " + + retCode); + } + } catch (Exception e) { + System.err.println("trying to kill job, caught: " + e); + // do nothing + } + } + } + } + public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { if (ctrs == null) { // hadoop might return null if it cannot locate the job. Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1104622) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -85,6 +85,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.HiveInterruptCallback; +import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -1619,99 +1621,98 @@ // Process the case when name node call is needed final Map resultMap = new ConcurrentHashMap(); ArrayList> results = new ArrayList>(); - ThreadPoolExecutor executor = null; + final ThreadPoolExecutor executor; int maxThreads = ctx.getConf().getInt("mapred.dfsclient.parallelism.max", 0); if (pathNeedProcess.size() > 1 && maxThreads > 1) { int numExecutors = Math.min(pathNeedProcess.size(), maxThreads); LOG.info("Using " + numExecutors + " threads for getContentSummary"); executor = new ThreadPoolExecutor(numExecutors, numExecutors, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()); + } else { + executor = null; } - // - Configuration conf = ctx.getConf(); - JobConf jobConf = new JobConf(conf); - for (String path : pathNeedProcess) { - final Path p = new Path(path); - final String pathStr = path; - // All threads share the same Configuration and JobConf based on the - // assumption that they are thread safe if only read operations are - // executed. It is not stated in Hadoop's javadoc, the sourcce codes - // clearly showed that they made efforts for it and we believe it is - // thread safe. Will revisit this piece of codes if we find the assumption - // is not correct. - final Configuration myConf = conf; - final JobConf myJobConf = jobConf; - final PartitionDesc partDesc = work.getPathToPartitionInfo().get( - p.toString()); - Runnable r = new Runnable() { - public void run() { - try { - ContentSummary resultCs; + HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { + @Override + public void interrupt() { + if (executor != null) { + executor.shutdownNow(); + } + } + }); + try { + Configuration conf = ctx.getConf(); + JobConf jobConf = new JobConf(conf); + for (String path : pathNeedProcess) { + final Path p = new Path(path); + final String pathStr = path; + // All threads share the same Configuration and JobConf based on the + // assumption that they are thread safe if only read operations are + // executed. It is not stated in Hadoop's javadoc, the sourcce codes + // clearly showed that they made efforts for it and we believe it is + // thread safe. Will revisit this piece of codes if we find the assumption + // is not correct. + final Configuration myConf = conf; + final JobConf myJobConf = jobConf; + final PartitionDesc partDesc = work.getPathToPartitionInfo().get( + p.toString()); + Runnable r = new Runnable() { + public void run() { + try { + ContentSummary resultCs; - Class inputFormatCls = partDesc - .getInputFileFormatClass(); - InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( - inputFormatCls, myJobConf); - if (inputFormatObj instanceof ContentSummaryInputFormat) { - resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p, - myJobConf); - } else { - FileSystem fs = p.getFileSystem(myConf); - resultCs = fs.getContentSummary(p); + Class inputFormatCls = partDesc + .getInputFileFormatClass(); + InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( + inputFormatCls, myJobConf); + if (inputFormatObj instanceof ContentSummaryInputFormat) { + resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p, + myJobConf); + } else { + FileSystem fs = p.getFileSystem(myConf); + resultCs = fs.getContentSummary(p); + } + resultMap.put(pathStr, resultCs); + } catch (IOException e) { + // We safely ignore this exception for summary data. + // We don't update the cache to protect it from polluting other + // usages. The worst case is that IOException will always be + // retried for another getInputSummary(), which is fine as + // IOException is not considered as a common case. + LOG.info("Cannot get size of " + pathStr + ". Safely ignored."); } - resultMap.put(pathStr, resultCs); - } catch (IOException e) { - // We safely ignore this exception for summary data. - // We don't update the cache to protect it from polluting other - // usages. The worst case is that IOException will always be - // retried for another getInputSummary(), which is fine as - // IOException is not considered as a common case. - LOG.info("Cannot get size of " + pathStr + ". Safely ignored."); } + }; + + if (executor == null) { + r.run(); + } else { + Future result = executor.submit(r); + results.add(result); } - }; - - if (executor == null) { - r.run(); - } else { - Future result = executor.submit(r); - results.add(result); } - } - if (executor != null) { - for (Future result : results) { - boolean executorDone = false; - do { - try { - result.get(); - executorDone = true; - } catch (InterruptedException e) { - LOG.info("Interrupted when waiting threads: ", e); - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - throw new IOException(e); - } - } while (!executorDone); + if (executor != null) { + executor.shutdown(); } - executor.shutdown(); - } + HiveInterruptUtils.checkInterrupted(); + for (Map.Entry entry : resultMap.entrySet()) { + ContentSummary cs = entry.getValue(); - for (Map.Entry entry : resultMap.entrySet()) { - ContentSummary cs = entry.getValue(); + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); - summary[0] += cs.getLength(); - summary[1] += cs.getFileCount(); - summary[2] += cs.getDirectoryCount(); + ctx.addCS(entry.getKey(), cs); + LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength() + + " file count: " + + cs.getFileCount() + " directory count: " + cs.getDirectoryCount()); + } - ctx.addCS(entry.getKey(), cs); - LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + cs.getLength() - + " file count: " - + cs.getFileCount() + " directory count: " + cs.getDirectoryCount()); + return new ContentSummary(summary[0], summary[1], summary[2]); + } finally { + HiveInterruptUtils.remove(interrup); } - - return new ContentSummary(summary[0], summary[1], summary[2]); } }