diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index 58d1638..6047c54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -66,8 +66,20 @@ */ public class ExecReducer extends MapReduceBase implements Reducer { + private static final Log LOG = LogFactory.getLog("ExecReducer"); private static final String PLAN_KEY = "__REDUCE_PLAN__"; + // used to log memory usage periodically + private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + // Input value serde needs to be an array to support different SerDe + // for different tags + private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; + private final Object[] valueObject = new Object[Byte.MAX_VALUE]; + private final List row = new ArrayList(Utilities.reduceFieldNameList.size()); + private final boolean isLogInfoEnabled = LOG.isInfoEnabled(); + + // TODO: move to DynamicSerDe when it's ready + private Deserializer inputKeyDeserializer; private JobConf jc; private OutputCollector oc; private Operator reducer; @@ -76,23 +88,11 @@ private boolean isTagged = false; private long cntr = 0; private long nextCntr = 1; - - public static final Log l4j = LogFactory.getLog("ExecReducer"); - private boolean isLogInfoEnabled = false; - - // used to log memory usage periodically - private MemoryMXBean memoryMXBean; - - // TODO: move to DynamicSerDe when it's ready - private Deserializer inputKeyDeserializer; - // Input value serde needs to be an array to support different SerDe - // for different tags - private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; - - TableDesc keyTableDesc; - TableDesc[] valueTableDesc; - - ObjectInspector[] rowObjectInspector; + private TableDesc keyTableDesc; + private TableDesc[] valueTableDesc; + private ObjectInspector[] rowObjectInspector; + private Object keyObject; + private BytesWritable groupKey; @Override public void configure(JobConf job) { @@ -100,20 +100,16 @@ public void configure(JobConf job) { ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; - // Allocate the bean at the beginning - - memoryMXBean = ManagementFactory.getMemoryMXBean(); - l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - - isLogInfoEnabled = l4j.isInfoEnabled(); + LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); try { - l4j.info("conf classpath = " + LOG.info("conf classpath = " + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); - l4j.info("thread classpath = " + LOG.info("thread classpath = " + Arrays.asList(((URLClassLoader) Thread.currentThread() .getContextClassLoader()).getURLs())); } catch (Exception e) { - l4j.info("cannot get classpath: " + e.getMessage()); + LOG.info("cannot get classpath: " + e.getMessage()); } jc = job; @@ -162,7 +158,7 @@ public void configure(JobConf job) { // initialize reduce operator tree try { - l4j.info(reducer.dump(0)); + LOG.info(reducer.dump(0)); reducer.initialize(jc, rowObjectInspector); } catch (Throwable e) { abort = true; @@ -175,13 +171,6 @@ public void configure(JobConf job) { } } - private Object keyObject; - private final Object[] valueObject = new Object[Byte.MAX_VALUE]; - - private BytesWritable groupKey; - - List row = new ArrayList(Utilities.reduceFieldNameList.size()); - public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { if (reducer.getDone()) { @@ -212,7 +201,7 @@ public void reduce(Object key, Iterator values, OutputCollector output, groupKey = new BytesWritable(); } else { // If a operator wants to do some work at the end of a group - l4j.trace("End Group"); + LOG.trace("End Group"); reducer.endGroup(); } @@ -227,7 +216,7 @@ public void reduce(Object key, Iterator values, OutputCollector output, } groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); - l4j.trace("Start Group"); + LOG.trace("Start Group"); reducer.setGroupKeyObject(keyObject); reducer.startGroup(); } @@ -253,7 +242,7 @@ public void reduce(Object key, Iterator values, OutputCollector output, cntr++; if (cntr == nextCntr) { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecReducer: processing " + cntr + LOG.info("ExecReducer: processing " + cntr + " rows: used memory = " + used_memory); nextCntr = getNextCntr(cntr); } @@ -279,7 +268,7 @@ public void reduce(Object key, Iterator values, OutputCollector output, // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else { - l4j.fatal(StringUtils.stringifyException(e)); + LOG.fatal(StringUtils.stringifyException(e)); throw new RuntimeException(e); } } @@ -301,17 +290,17 @@ public void close() { // No row was processed if (oc == null) { - l4j.trace("Close called no row"); + LOG.trace("Close called no row"); } try { if (groupKey != null) { // If a operator wants to do some work at the end of a group - l4j.trace("End Group"); + LOG.trace("End Group"); reducer.endGroup(); } if (isLogInfoEnabled) { - l4j.info("ExecReducer: processed " + cntr + " rows: used memory = " + LOG.info("ExecReducer: processed " + cntr + " rows: used memory = " + memoryMXBean.getHeapMemoryUsage().getUsed()); } @@ -322,7 +311,7 @@ public void close() { } catch (Exception e) { if (!abort) { // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); + LOG.error("Hit error while closing operators - failing tree"); throw new RuntimeException("Hive Runtime Error while closing operators: " + e.getMessage(), e); }