Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1503555) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -182,6 +182,8 @@ SUBMITVIACHILD("hive.exec.submitviachild", false), SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000), ALLOWPARTIALCONSUMP("hive.exec.script.allow.partial.consumption", false), + STREAMREPORTERPERFIX("stream.stderr.reporter.prefix", "reporter:"), + STREAMREPORTERENABLED("stream.stderr.reporter.enabled", true), COMPRESSRESULT("hive.exec.compress.output", false), COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false), COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""), Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (revision 1503555) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (working copy) @@ -46,6 +46,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; @@ -532,22 +533,68 @@ } } + class CounterStatusProcessor { + + private final String reporterPrefix; + private final String counterPrefix; + private final String statusPrefix; + private final Reporter reporter; + + CounterStatusProcessor(Configuration hconf, Reporter reporter){ + this.reporterPrefix = HiveConf.getVar(hconf, HiveConf.ConfVars.STREAMREPORTERPERFIX); + this.counterPrefix = reporterPrefix + "counter:"; + this.statusPrefix = reporterPrefix + "status:"; + this.reporter = reporter; + } + + private boolean process(String line) { + if (line.startsWith(reporterPrefix)){ + if (line.startsWith(counterPrefix)){ + incrCounter(line); + } + if (line.startsWith(statusPrefix)){ + setStatus(line); + } + return true; + } else { + return false; + } + } + + private void incrCounter(String line) { + String trimmedLine = line.substring(counterPrefix.length()).trim(); + String[] columns = trimmedLine.split(","); + if (columns.length == 3) { + try { + reporter.incrCounter(columns[0], columns[1], Long.parseLong(columns[2])); + } catch (NumberFormatException e) { + LOG.warn("Cannot parse counter increment '" + columns[2] + + "' from line " + line); + } + } else { + LOG.warn("Cannot parse counter line: " + line); + } + } + + private void setStatus(String line) { + reporter.setStatus(line.substring(statusPrefix.length()).trim()); + } + } /** * The processor for stderr stream. - * - * TODO: In the future when we move to hadoop 0.18 and above, we should borrow - * the logic from HadoopStreaming: PipeMapRed.java MRErrorThread to support - * counters and status updates. */ class ErrorStreamProcessor implements StreamProcessor { private long bytesCopied = 0; private final long maxBytes; - private long lastReportTime; + private CounterStatusProcessor counterStatus; public ErrorStreamProcessor(int maxBytes) { this.maxBytes = maxBytes; lastReportTime = 0; + if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.STREAMREPORTERENABLED)){ + counterStatus = new CounterStatusProcessor(hconf, reporter); + } } public void processLine(Writable line) throws HiveException { @@ -571,6 +618,14 @@ reporter.progress(); } + if (reporter != null) { + if (counterStatus != null) { + if (counterStatus.process(stringLine)) { + return; + } + } + } + if ((maxBytes < 0) || (bytesCopied < maxBytes)) { System.err.println(stringLine); } @@ -659,7 +714,7 @@ for (int i = 0; i < inArgs.length; i++) { finalArgv[wrapComponents.length + i] = inArgs[i]; } - return (finalArgv); + return finalArgv; } // Code below shameless borrowed from Hadoop Streaming