Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1504737)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -182,6 +182,8 @@
SUBMITVIACHILD("hive.exec.submitviachild", false),
SCRIPTERRORLIMIT("hive.exec.script.maxerrsize", 100000),
ALLOWPARTIALCONSUMP("hive.exec.script.allow.partial.consumption", false),
+ STREAMREPORTERPERFIX("stream.stderr.reporter.prefix", "reporter:"),
+ STREAMREPORTERENABLED("stream.stderr.reporter.enabled", true),
COMPRESSRESULT("hive.exec.compress.output", false),
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
Index: conf/hive-default.xml.template
===================================================================
--- conf/hive-default.xml.template (revision 1504737)
+++ conf/hive-default.xml.template (working copy)
@@ -896,6 +896,18 @@
+ stream.stderr.reporter.prefix
+ reporter:
+ Streaming jobs that log to stardard error with this prefix can log counter or status information.
+
+
+
+ stream.stderr.reporter.enabled
+ true
+ Enable consuption of status and counter messages for streaming jobs.
+
+
+
hive.script.recordwriter
org.apache.hadoop.hive.ql.exec.TextRecordWriter
The default record writer for writing data to the user scripts.
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (revision 1504737)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (working copy)
@@ -46,6 +46,7 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -532,22 +533,68 @@
}
}
+ class CounterStatusProcessor {
+
+ private final String reporterPrefix;
+ private final String counterPrefix;
+ private final String statusPrefix;
+ private final Reporter reporter;
+
+ CounterStatusProcessor(Configuration hconf, Reporter reporter){
+ this.reporterPrefix = HiveConf.getVar(hconf, HiveConf.ConfVars.STREAMREPORTERPERFIX);
+ this.counterPrefix = reporterPrefix + "counter:";
+ this.statusPrefix = reporterPrefix + "status:";
+ this.reporter = reporter;
+ }
+
+ private boolean process(String line) {
+ if (line.startsWith(reporterPrefix)){
+ if (line.startsWith(counterPrefix)){
+ incrCounter(line);
+ }
+ if (line.startsWith(statusPrefix)){
+ setStatus(line);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void incrCounter(String line) {
+ String trimmedLine = line.substring(counterPrefix.length()).trim();
+ String[] columns = trimmedLine.split(",");
+ if (columns.length == 3) {
+ try {
+ reporter.incrCounter(columns[0], columns[1], Long.parseLong(columns[2]));
+ } catch (NumberFormatException e) {
+ LOG.warn("Cannot parse counter increment '" + columns[2] +
+ "' from line " + line);
+ }
+ } else {
+ LOG.warn("Cannot parse counter line: " + line);
+ }
+ }
+
+ private void setStatus(String line) {
+ reporter.setStatus(line.substring(statusPrefix.length()).trim());
+ }
+ }
/**
* The processor for stderr stream.
- *
- * TODO: In the future when we move to hadoop 0.18 and above, we should borrow
- * the logic from HadoopStreaming: PipeMapRed.java MRErrorThread to support
- * counters and status updates.
*/
class ErrorStreamProcessor implements StreamProcessor {
private long bytesCopied = 0;
private final long maxBytes;
-
private long lastReportTime;
+ private CounterStatusProcessor counterStatus;
public ErrorStreamProcessor(int maxBytes) {
this.maxBytes = maxBytes;
lastReportTime = 0;
+ if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.STREAMREPORTERENABLED)){
+ counterStatus = new CounterStatusProcessor(hconf, reporter);
+ }
}
public void processLine(Writable line) throws HiveException {
@@ -571,6 +618,14 @@
reporter.progress();
}
+ if (reporter != null) {
+ if (counterStatus != null) {
+ if (counterStatus.process(stringLine)) {
+ return;
+ }
+ }
+ }
+
if ((maxBytes < 0) || (bytesCopied < maxBytes)) {
System.err.println(stringLine);
}
@@ -659,7 +714,7 @@
for (int i = 0; i < inArgs.length; i++) {
finalArgv[wrapComponents.length + i] = inArgs[i];
}
- return (finalArgv);
+ return finalArgv;
}
// Code below shameless borrowed from Hadoop Streaming