Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 954608) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -100,6 +100,7 @@ Path[] outPaths; Path[] finalPaths; RecordWriter[] outWriters; + int timeOut; // JT timeout in msec. public FSPaths() { } @@ -109,6 +110,10 @@ outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; outWriters = new RecordWriter[numFiles]; + // Timeout is chosen to make sure that even if one iteration takes more than + // half of the script.timeout but less than script.timeout, we will still + // be able to report progress. + timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000)/2; } /** @@ -159,25 +164,49 @@ return outWriters; } + /** + * Report status to JT so that JT won't kill this task if closing takes too long + * due to too many files to close and the NN is overloaded. + * @param lastUpdateTime the time (msec) that progress update happened. + * @return true if a new progress update is reported, false otherwise. + */ + private boolean updateProgress(long lastUpdateTime) { + if (reporter != null && + (System.currentTimeMillis() - lastUpdateTime) > timeOut) { + reporter.progress(); + return true; + } else { + return false; + } + } + public void closeWriters(boolean abort) throws HiveException { + long lastProgressReport = System.currentTimeMillis(); for (int idx = 0; idx < outWriters.length; idx++) { if (outWriters[idx] != null) { try { outWriters[idx].close(abort); + if (updateProgress(lastProgressReport)) { + lastProgressReport = System.currentTimeMillis(); + } } catch (IOException e) { throw new HiveException(e); } } - } + } } private void commit(FileSystem fs) throws HiveException { + long lastProgressReport = System.currentTimeMillis(); for (int idx = 0; idx < outPaths.length; ++idx) { try { if (!fs.rename(outPaths[idx], finalPaths[idx])) { throw new HiveException("Unable to rename output to: " + finalPaths[idx]); } + if (updateProgress(lastProgressReport)) { + lastProgressReport = System.currentTimeMillis(); + } } catch (IOException e) { throw new HiveException(e + "Unable to rename output to: " + finalPaths[idx]); @@ -186,6 +215,7 @@ } public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException { + long lastProgressReport = System.currentTimeMillis(); for (int idx = 0; idx < outWriters.length; idx++) { if (outWriters[idx] != null) { try { @@ -193,6 +223,9 @@ if (delete) { fs.delete(outPaths[idx], true); } + if (updateProgress(lastProgressReport)) { + lastProgressReport = System.currentTimeMillis(); + } } catch (IOException e) { throw new HiveException(e); }