diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index da02fa5..1d545e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1133,7 +1133,7 @@ public class Driver implements CommandProcessor { if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { if (running.size() != 0) { - taskCleanup(); + taskCleanup(running); } // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value @@ -1172,7 +1172,7 @@ public class Driver implements CommandProcessor { SQLState = "08S01"; console.printError(errorMessage); if (running.size() != 0) { - taskCleanup(); + taskCleanup(running); } // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value @@ -1339,11 +1339,17 @@ public class Driver implements CommandProcessor { * Cleans up remaining tasks in case of failure */ - public void taskCleanup() { + public void taskCleanup(Map running) { + for (Map.Entry entry : running.entrySet()) { + if (entry.getKey().isRunning()) { + entry.getValue().getTask().shutdown(); + } + } + running.clear(); // The currently existing Shutdown hooks will be automatically called, // killing the map-reduce processes. // The non MR processes will be killed as well. - System.exit(9); +// System.exit(9); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java index f612119..0a5da8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java @@ -102,6 +102,8 @@ public class ExecDriver extends Task implements Serializable, Hadoop protected static transient final Log LOG = LogFactory.getLog(ExecDriver.class); + private RunningJob rj; + /** * Constructor when invoked from QL. */ @@ -366,7 +368,6 @@ public class ExecDriver extends Task implements Serializable, Hadoop initializeFiles("tmpfiles", addedFiles); } int returnVal = 0; - RunningJob rj = null; boolean noName = StringUtils.isEmpty(HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJOBNAME)); if (noName) { @@ -1008,4 +1009,17 @@ public class ExecDriver extends Task implements Serializable, Hadoop public void logPlanProgress(SessionState ss) throws IOException { ss.getHiveHistory().logPlanProgress(queryPlan); } + + @Override + public void shutdown() { + super.shutdown(); + if (rj != null) { + try { + rj.killJob(); + } catch (Exception e) { + LOG.warn("failed to kill job " + rj.getID(), e); + } + rj = null; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java index a3e40f7..219c65e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java @@ -68,6 +68,8 @@ public class MapRedTask extends ExecDriver implements Serializable { private transient long totalInputFileSize; private transient long totalInputNumFiles; + private Process executor; + public MapRedTask() { super(); } @@ -209,7 +211,6 @@ public class MapRedTask extends ExecDriver implements Serializable { } LOG.info("Executing: " + cmdLine); - Process executor = null; // Inherit Java system variables String hadoopOpts; @@ -544,4 +545,13 @@ public class MapRedTask extends ExecDriver implements Serializable { public Operator getReducer() { return getWork().getReducer(); } + + @Override + public void shutdown() { + super.shutdown(); + if (executor != null) { + executor.destroy(); + executor = null; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java index d2b12a7..3df4e13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java @@ -77,6 +77,8 @@ public class MapredLocalTask extends Task implements Serializab // will pass this context throught private final ExecMapperContext execContext = new ExecMapperContext(); + private Process executor; + public MapredLocalTask() { super(); } @@ -156,7 +158,6 @@ public class MapredLocalTask extends Task implements Serializab } LOG.info("Executing: " + cmdLine); - Process executor = null; // Inherit Java system variables String hadoopOpts; @@ -475,4 +476,12 @@ public class MapredLocalTask extends Task implements Serializab return StageType.MAPREDLOCAL; } + @Override + public void shutdown() { + super.shutdown(); + if (executor != null) { + executor.destroy(); + executor = null; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index a4e59ca..4e20c89 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -507,4 +507,7 @@ public abstract class Task implements Serializable, Node public String getJobID() { return jobID; } + + public void shutdown() { + } } diff --git ql/src/test/queries/clientnegative/driver_suicide.q ql/src/test/queries/clientnegative/driver_suicide.q new file mode 100644 index 0000000..3977f8b --- /dev/null +++ ql/src/test/queries/clientnegative/driver_suicide.q @@ -0,0 +1,5 @@ +CREATE TABLE SAMPLETABLE(IP STRING , showtime BIGINT ) partitioned by (ds string,ipz int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\040'; + +ALTER TABLE SAMPLETABLE add Partition(ds='sf') location '/user/hive/warehouse' Partition(ipz=100) location '/user/hive/warehouse'; + +drop table SAMPLETABLE; \ No newline at end of file diff --git ql/src/test/results/clientnegative/driver_suicide.q.out ql/src/test/results/clientnegative/driver_suicide.q.out new file mode 100644 index 0000000..80e0dc9 --- /dev/null +++ ql/src/test/results/clientnegative/driver_suicide.q.out @@ -0,0 +1,11 @@ +PREHOOK: query: CREATE TABLE SAMPLETABLE(IP STRING , showtime BIGINT ) partitioned by (ds string,ipz int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\040' +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE SAMPLETABLE(IP STRING , showtime BIGINT ) partitioned by (ds string,ipz int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\040' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@SAMPLETABLE +PREHOOK: query: ALTER TABLE SAMPLETABLE add Partition(ds='sf') location '/user/hive/warehouse' Partition(ipz=100) location '/user/hive/warehouse' +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Input: default@sampletable +FAILED: Error in metadata: table is partitioned but partition spec is not specified or does not fully match table partitioning: {ds=sf} +FAILED: Error in metadata: table is partitioned but partition spec is not specified or does not fully match table partitioning: {ipz=100} +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask