diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index be85a53..bf6b309 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -20,6 +20,7 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -45,6 +46,7 @@ import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -72,8 +74,8 @@ private final transient LongWritable deserialize_error_count = new LongWritable(); private final transient LongWritable serialize_error_count = new LongWritable(); - transient Thread outThread = null; - transient Thread errThread = null; + transient StreamThread outThread = null; + transient StreamThread errThread = null; transient Process scriptPid = null; transient Configuration hconf; // Input to the script @@ -432,6 +434,8 @@ public void close(boolean abort) throws HiveException { throw new HiveException(ErrorMsg.SCRIPT_GENERIC_ERROR.getErrorCodedMsg(), scriptError); } // everything ok. try normal shutdown + IOUtils.closeStream(outThread); + IOUtils.closeStream(errThread); try { try { if (scriptOutWriter != null) { @@ -663,7 +667,7 @@ public void close() { } - class StreamThread extends Thread { + class StreamThread extends Thread implements Closeable { RecordReader in; StreamProcessor proc; @@ -697,14 +701,7 @@ public void run() { "\nCause: " + th.getCause()); LOG.warn(StringUtils.stringifyException(th)); } finally { - try { - if (in != null) { - in.close(); - } - } catch (Exception e) { - LOG.warn(name + ": error in closing .."); - LOG.warn(StringUtils.stringifyException(e)); - } + close(); try { if (null != proc) { @@ -715,6 +712,18 @@ public void run() { } } } + + public void close() { + try { + if (in != null) { + in.close(); + } + in = null; + } catch (Exception e) { + LOG.warn(name + ": error in closing .."); + LOG.warn(StringUtils.stringifyException(e)); + } + } } /**