Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -109,8 +109,17 @@ private StructField bucketField; // field bucket is in in record id private StructObjectInspector recIdInspector; // OI for inspecting record id private IntObjectInspector bucketInspector; // OI for inspecting bucket id + protected transient long numRows = 0; + protected transient long cntr = 1; /** + * Counters. + */ + public static enum Counter { + RECORDS_OUT + } + + /** * RecordWriter. * */ @@ -249,7 +258,7 @@ private static final long serialVersionUID = 1L; protected transient FileSystem fs; protected transient Serializer serializer; - protected transient LongWritable row_count; + protected final transient LongWritable row_count = new LongWritable(); private transient boolean isNativeTable = true; /** @@ -352,7 +361,7 @@ prtner = (HivePartitioner) ReflectionUtils.newInstance( jc.getPartitionerClass(), null); } - row_count = new LongWritable(); + if (dpCtx != null) { dpSetup(); } @@ -381,6 +390,13 @@ bucketField = recIdInspector.getAllStructFieldRefs().get(1); bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector(); } + + String context = jc.get(Operator.CONTEXT_NAME_KEY, ""); + if (context != null && !context.isEmpty()) { + context = "_" + context.replace(" ","_"); + } + statsMap.put(Counter.RECORDS_OUT + context, row_count); + initializeChildren(hconf); } catch (HiveException e) { throw e; @@ -657,9 +673,9 @@ fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1); } - - if (row_count != null) { - row_count.set(row_count.get() + 1); + if (++numRows == cntr) { + cntr *= 10; + LOG.info(toString() + ": records written - " + numRows); } int writerOffset = findWriterOffset(row); @@ -921,6 +937,9 @@ @Override public void closeOp(boolean abort) throws HiveException { + row_count.set(numRows); + LOG.info(toString() + ": records written - " + numRows); + if (!bDynParts && !filesCreated) { createBucketFiles(fsp); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (working copy) @@ -37,20 +37,8 @@ Serializable { private static final long serialVersionUID = 1L; - - /** - * Counter. - * - */ - public static enum Counter { - FILTERED, PASSED - } - - protected final transient LongWritable filtered_count; - protected final transient LongWritable passed_count; private transient ExprNodeEvaluator conditionEvaluator; private transient PrimitiveObjectInspector conditionInspector; - private transient int consecutiveFails; private transient int consecutiveSearches; private transient IOContext ioContext; protected transient int heartbeatInterval; @@ -57,9 +45,6 @@ public FilterOperator() { super(); - filtered_count = new LongWritable(); - passed_count = new LongWritable(); - consecutiveFails = 0; consecutiveSearches = 0; } @@ -73,8 +58,6 @@ conditionEvaluator = ExprNodeEvaluatorFactory.toCachedEval(conditionEvaluator); } - statsMap.put(Counter.FILTERED, filtered_count); - statsMap.put(Counter.PASSED, passed_count); conditionInspector = null; ioContext = IOContext.get(hconf.get(Utilities.INPUT_NAME)); } catch (Throwable e) { @@ -131,21 +114,10 @@ return; } - Boolean ret = (Boolean) conditionInspector + boolean ret = (Boolean) conditionInspector .getPrimitiveJavaObject(condition); - if (Boolean.TRUE.equals(ret)) { + if (ret) { forward(row, rowInspector); - passed_count.set(passed_count.get() + 1); - consecutiveFails = 0; - } else { - filtered_count.set(filtered_count.get() + 1); - consecutiveFails++; - - // In case of a lot of consecutive failures, send a heartbeat in order to - // avoid timeout - if (((consecutiveFails % heartbeatInterval) == 0) && (reporter != null)) { - reporter.progress(); - } } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (working copy) @@ -63,7 +63,7 @@ skewJoinKeyContext.initiliaze(hconf); skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs); } - statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS, skewjoin_followup_jobs); + statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS.toString(), skewjoin_followup_jobs); } @Override Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (working copy) @@ -75,10 +75,16 @@ * */ public static enum Counter { - DESERIALIZE_ERRORS + DESERIALIZE_ERRORS, + RECORDS_IN } private final transient LongWritable deserialize_error_count = new LongWritable(); + private final transient LongWritable recordCounter = new LongWritable(); + protected transient long numRows = 0; + protected transient long cntr = 1; + protected final boolean isInfoEnabled = LOG.isInfoEnabled(); + protected final boolean isDebugEnabled = LOG.isDebugEnabled(); private final Map opCtxMap = new HashMap(); private final Map, MapOpCtx> childrenOpToOpCtxMap = @@ -362,7 +368,7 @@ for (String onealias : aliases) { Operator op = conf.getAliasToWork().get(onealias); - if (LOG.isDebugEnabled()) { + if (isDebugEnabled) { LOG.debug("Adding alias " + onealias + " to work list for file " + onefile); } @@ -380,8 +386,10 @@ if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { children.add(op); childrenOpToOpCtxMap.put(op, opCtx); - LOG.info("dump " + op + " " + if (isInfoEnabled) { + LOG.info("dump " + op + " " + opCtxMap.get(inp).rowObjectInspector.getTypeName()); + } } current = opCtx; // just need for TestOperators.testMapOperator } @@ -406,7 +414,14 @@ public void initializeOp(Configuration hconf) throws HiveException { // set that parent initialization is done and call initialize on children state = State.INIT; - statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count); + statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count); + if (isInfoEnabled) { + String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); + if (context != null && !context.isEmpty()) { + context = "_" + context.replace(" ","_"); + } + statsMap.put(Counter.RECORDS_IN + context, recordCounter); + } List> children = getChildOperators(); @@ -451,6 +466,7 @@ op.close(abort); } } + recordCounter.set(numRows); } // Find context for current input file @@ -473,7 +489,9 @@ MapOpCtx context = opCtxMap.get(inp); if (context != null) { current = context; - LOG.info("Processing alias " + onealias + " for file " + onefile); + if (isInfoEnabled) { + LOG.info("Processing alias " + onealias + " for file " + onefile); + } return; } } @@ -533,6 +551,12 @@ // The row has been converted to comply with table schema, irrespective of partition schema. // So, use tblOI (and not partOI) for forwarding try { + if (isInfoEnabled) { + if (++numRows == cntr) { + cntr *= 10; + LOG.info(toString() + ": records read - " + numRows); + } + } forward(row, current.rowObjectInspector); } catch (Exception e) { // Serialize the row and output the error message. Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -61,6 +61,7 @@ public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES"; public static final String HIVECOUNTERFATAL = "FATAL_ERROR"; + public static final String CONTEXT_NAME_KEY = "__hive.context.name"; private transient Configuration configuration; protected List> childOperators; @@ -210,7 +211,7 @@ // non-bean .. - protected transient HashMap, LongWritable> statsMap = new HashMap, LongWritable>(); + protected transient Map statsMap = new HashMap(); @SuppressWarnings("rawtypes") protected transient OutputCollector out; protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); @@ -287,9 +288,9 @@ } } - public Map, Long> getStats() { - HashMap, Long> ret = new HashMap, Long>(); - for (Enum one : statsMap.keySet()) { + public Map getStats() { + HashMap ret = new HashMap(); + for (String one : statsMap.keySet()) { ret.put(one, Long.valueOf(statsMap.get(one).get())); } return (ret); @@ -807,7 +808,7 @@ } public void resetStats() { - for (Enum e : statsMap.keySet()) { + for (String e : statsMap.keySet()) { statsMap.get(e).set(0L); } } @@ -840,7 +841,7 @@ } public void logStats() { - for (Enum e : statsMap.keySet()) { + for (String e : statsMap.keySet()) { LOG.info(e.toString() + ":" + statsMap.get(e).toString()); } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (working copy) @@ -51,6 +51,7 @@ import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.hash.MurmurHash; @@ -65,6 +66,13 @@ PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex"); } + /** + * Counters. + */ + public static enum Counter { + RECORDS_OUT_INTERMEDIATE + } + private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName()); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -144,9 +152,21 @@ private StructObjectInspector recIdInspector; // OI for the record identifier private IntObjectInspector bucketInspector; // OI for the bucket field in the record id + protected transient long numRows = 0; + protected transient long cntr = 1; + private final transient LongWritable recordCounter = new LongWritable(); + @Override protected void initializeOp(Configuration hconf) throws HiveException { try { + if (isInfoEnabled) { + String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); + if (context != null && !context.isEmpty()) { + context = "_" + context.replace(" ","_"); + } + statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter); + } + List keys = conf.getKeyCols(); if (isDebugEnabled) { @@ -506,6 +526,12 @@ // Since this is a terminal operator, update counters explicitly - // forward is not called if (null != out) { + if (isInfoEnabled) { + if (++numRows == cntr) { + cntr *= 10; + LOG.info(toString() + ": records written - " + numRows); + } + } out.collect(keyWritable, valueWritable); } } @@ -535,6 +561,10 @@ } super.closeOp(abort); out = null; + if (isInfoEnabled) { + LOG.info(toString() + ": records written - " + numRows); + recordCounter.set(numRows); + } } /** Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (working copy) @@ -236,8 +236,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { firstRow = true; - statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count); - statsMap.put(Counter.SERIALIZE_ERRORS, serialize_error_count); + statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count); + statsMap.put(Counter.SERIALIZE_ERRORS.toString(), serialize_error_count); try { this.hconf = hconf; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (working copy) @@ -26,8 +26,11 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -72,9 +75,6 @@ private static boolean done; // used to log memory usage periodically - public static MemoryMXBean memoryMXBean; - private long numRows = 0; - private long nextCntr = 1; private MapredLocalWork localWork = null; private boolean isLogInfoEnabled = false; @@ -84,8 +84,6 @@ public void configure(JobConf job) { execContext = new ExecMapperContext(job); // Allocate the bean at the beginning - - memoryMXBean = ManagementFactory.getMemoryMXBean(); - l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); isLogInfoEnabled = l4j.isInfoEnabled(); @@ -176,15 +174,6 @@ // Since there is no concept of a group, we don't invoke // startGroup/endGroup for a mapper mo.process((Writable)value); - if (isLogInfoEnabled) { - numRows++; - if (numRows == nextCntr) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processing " + numRows - + " rows: used memory = " + used_memory); - nextCntr = getNextCntr(numRows); - } - } } } catch (Throwable e) { abort = true; @@ -198,18 +187,6 @@ } } - - private long getNextCntr(long cntr) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (cntr >= 1000000) { - return cntr + 1000000; - } - - return 10 * cntr; - } - @Override public void close() { // No row was processed @@ -245,13 +222,7 @@ } } - if (isLogInfoEnabled) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " - + used_memory); - } - - ReportStats rps = new ReportStats(rp); + ReportStats rps = new ReportStats(rp, jc); mo.preorderMap(rps); return; } catch (Exception e) { @@ -288,17 +259,20 @@ */ public static class ReportStats implements Operator.OperatorFunc { private final Reporter rp; + private final Configuration conf; - public ReportStats(Reporter rp) { + public ReportStats(Reporter rp, Configuration conf) { this.rp = rp; + this.conf = conf; } @Override public void func(Operator op) { - Map, Long> opStats = op.getStats(); - for (Map.Entry, Long> e : opStats.entrySet()) { + Map opStats = op.getStats(); + for (Map.Entry e : opStats.entrySet()) { if (rp != null) { - rp.incrCounter(e.getKey(), e.getValue()); + rp.incrCounter(HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP), + e.getKey(), e.getValue()); } } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (working copy) @@ -70,8 +70,6 @@ private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final String PLAN_KEY = "__REDUCE_PLAN__"; - // used to log memory usage periodically - private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); // Input value serde needs to be an array to support different SerDe // for different tags private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE]; @@ -86,8 +84,6 @@ private Reporter rp; private boolean abort = false; private boolean isTagged = false; - private long cntr = 0; - private long nextCntr = 1; private TableDesc keyTableDesc; private TableDesc[] valueTableDesc; private ObjectInspector[] rowObjectInspector; @@ -103,8 +99,6 @@ ObjectInspector keyObjectInspector; if (isInfoEnabled) { - LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - try { LOG.info("conf classpath = " + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); @@ -245,17 +239,7 @@ row.clear(); row.add(keyObject); row.add(valueObject[tag]); - if (isInfoEnabled) { - cntr++; - if (cntr == nextCntr) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - if (isInfoEnabled) { - LOG.info("ExecReducer: processing " + cntr - + " rows: used memory = " + used_memory); - } - nextCntr = getNextCntr(cntr); - } - } + try { reducer.processOp(row, tag); } catch (Exception e) { @@ -283,17 +267,6 @@ } } - private long getNextCntr(long cntr) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (cntr >= 1000000) { - return cntr + 1000000; - } - - return 10 * cntr; - } - @Override public void close() { @@ -310,13 +283,9 @@ } reducer.endGroup(); } - if (isInfoEnabled) { - LOG.info("ExecReducer: processed " + cntr + " rows: used memory = " - + memoryMXBean.getHeapMemoryUsage().getUsed()); - } reducer.close(abort); - ReportStats rps = new ReportStats(rp); + ReportStats rps = new ReportStats(rp, jc); reducer.preorderMap(rps); } catch (Exception e) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (working copy) @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; @@ -178,6 +179,8 @@ private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork mapWork) { JobConf conf = new JobConf(baseConf); + conf.set(Operator.CONTEXT_NAME_KEY, mapWork.getName()); + if (mapWork.getNumMapTasks() != null) { // Is this required ? conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue()); @@ -640,6 +643,8 @@ private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) { JobConf conf = new JobConf(baseConf); + conf.set(Operator.CONTEXT_NAME_KEY, reduceWork.getName()); + // Is this required ? conf.set("mapred.reducer.class", ExecReducer.class.getName()); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (working copy) @@ -269,11 +269,7 @@ @Override void run() throws Exception { - while (sources[position].pushRecord()) { - if (isLogInfoEnabled) { - logProgress(); - } - } + while (sources[position].pushRecord()) {} } @Override @@ -305,10 +301,7 @@ } } - if (isLogInfoEnabled) { - logCloseInfo(); - } - ReportStats rps = new ReportStats(reporter); + ReportStats rps = new ReportStats(reporter, jconf); mapOp.preorderMap(rps); return; } catch (Exception e) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (working copy) @@ -156,10 +156,7 @@ } mergeOp.close(abort); - if (isLogInfoEnabled) { - logCloseInfo(); - } - ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter); + ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter, jconf); mergeOp.preorderMap(rps); } catch (Exception e) { if (!abort) { @@ -190,9 +187,6 @@ row[0] = key; row[1] = value; mergeOp.processOp(row, 0); - if (isLogInfoEnabled) { - logProgress(); - } } } catch (Throwable e) { abort = true; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (working copy) @@ -52,13 +52,10 @@ // used to log memory usage periodically - public static MemoryMXBean memoryMXBean; protected boolean isLogInfoEnabled = false; protected boolean isLogTraceEnabled = false; protected MRTaskReporter reporter; - private long numRows = 0; - private long nextUpdateCntr = 1; protected PerfLogger perfLogger = PerfLogger.getPerfLogger(); protected String CLASS_NAME = RecordProcessor.class.getName(); @@ -79,11 +76,6 @@ this.outputs = outputs; this.processorContext = processorContext; - // Allocate the bean at the beginning - - memoryMXBean = ManagementFactory.getMemoryMXBean(); - - l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - isLogInfoEnabled = l4j.isInfoEnabled(); isLogTraceEnabled = l4j.isTraceEnabled(); @@ -110,37 +102,6 @@ abstract void close(); - /** - * Log information to be logged at the end - */ - protected void logCloseInfo() { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("TezProcessor: processed " + numRows + " rows/groups: used memory = " + used_memory); - } - - /** - * Log number of records processed and memory used after processing many records - */ - protected void logProgress() { - numRows++; - if (numRows == nextUpdateCntr) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("TezProcessor: processing " + numRows + " rows/groups: used memory = " + used_memory); - nextUpdateCntr = getNextUpdateRecordCounter(numRows); - } - } - - private long getNextUpdateRecordCounter(long cntr) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (cntr >= 1000000) { - return cntr + 1000000; - } - - return 10 * cntr; - } - protected void createOutputMap() { Preconditions.checkState(outMap == null, "Outputs should only be setup once"); outMap = Maps.newHashMap(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (working copy) @@ -165,11 +165,7 @@ } // run the operator pipeline - while (sources[position].pushRecord()) { - if (isLogInfoEnabled) { - logProgress(); - } - } + while (sources[position].pushRecord()) {} } /** @@ -208,7 +204,7 @@ dummyOp.close(abort); } } - ReportStats rps = new ReportStats(reporter); + ReportStats rps = new ReportStats(reporter, jconf); reducer.preorderMap(rps); } catch (Exception e) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java (working copy) @@ -60,8 +60,6 @@ try { heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT); - statsMap.put(Counter.FILTERED, filtered_count); - statsMap.put(Counter.PASSED, passed_count); } catch (Throwable e) { throw new HiveException(e); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (revision 1631004) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (working copy) @@ -40,6 +40,13 @@ // The row has been converted to comply with table schema, irrespective of partition schema. // So, use tblOI (and not partOI) for forwarding try { + if (isInfoEnabled) { + numRows += ((VectorizedRowBatch)value).size; + while (numRows > cntr) { + cntr *= 10; + LOG.info(toString() + ": records read - " + numRows); + } + } forward(value, current.getRowObjectInspector()); } catch (Exception e) { throw new HiveException("Hive Runtime Error while processing row ", e); Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (revision 1631004) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (working copy) @@ -109,55 +109,6 @@ } } - public void testBaseFilterOperator() throws Throwable { - try { - System.out.println("Testing Filter Operator"); - ExprNodeDesc col0 = TestExecDriver.getStringColumn("col0"); - ExprNodeDesc col1 = TestExecDriver.getStringColumn("col1"); - ExprNodeDesc col2 = TestExecDriver.getStringColumn("col2"); - ExprNodeDesc zero = new ExprNodeConstantDesc("0"); - ExprNodeDesc func1 = TypeCheckProcFactory.DefaultExprProcessor - .getFuncExprNodeDesc(">", col2, col1); - ExprNodeDesc func2 = TypeCheckProcFactory.DefaultExprProcessor - .getFuncExprNodeDesc("==", col0, zero); - ExprNodeDesc func3 = TypeCheckProcFactory.DefaultExprProcessor - .getFuncExprNodeDesc("and", func1, func2); - assert (func3 != null); - FilterDesc filterCtx = new FilterDesc(func3, false); - - // Configuration - Operator op = OperatorFactory.get(FilterDesc.class); - op.setConf(filterCtx); - - // runtime initialization - op.initialize(new JobConf(TestOperators.class), - new ObjectInspector[] {r[0].oi}); - - for (InspectableObject oner : r) { - op.processOp(oner.o, 0); - } - - Map, Long> results = op.getStats(); - System.out.println("filtered = " - + results.get(FilterOperator.Counter.FILTERED)); - assertEquals(Long.valueOf(4), results - .get(FilterOperator.Counter.FILTERED)); - System.out.println("passed = " - + results.get(FilterOperator.Counter.PASSED)); - assertEquals(Long.valueOf(1), results.get(FilterOperator.Counter.PASSED)); - - /* - * for(Enum e: results.keySet()) { System.out.println(e.toString() + ":" + - * results.get(e)); } - */ - System.out.println("Filter Operator ok"); - - } catch (Throwable e) { - e.printStackTrace(); - throw e; - } - } - private void testTaskIds(String [] taskIds, String expectedAttemptId, String expectedTaskId) { Configuration conf = new JobConf(TestOperators.class); for (String one: taskIds) {