commit d86c6d3e1ca6568304053cbe369e0582caf064bd Author: Ivan Suller Date: Wed Jul 31 15:12:50 2019 +0200 HIVE-22065 Change-Id: I5d3263a89b30f237783da7c506a5f7c7fca41b89 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index ea2e1fdb65..8c9d53f521 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -27,7 +26,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Callable; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -79,21 +77,22 @@ * Just pump the records through the query plan. */ public class MapRecordProcessor extends RecordProcessor { - public static final Logger l4j = LoggerFactory.getLogger(MapRecordProcessor.class); - protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; + private static final Logger LOG = LoggerFactory.getLogger(MapRecordProcessor.class); + private static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private AbstractMapOperator mapOp; - private final List mergeMapOpList = new ArrayList(); + private final List mergeMapOpList = new ArrayList<>(); private MapRecordSource[] sources; - private final Map multiMRInputMap = new HashMap(); + private final Map multiMRInputMap = new HashMap<>(); private int position; - MRInputLegacy legacyMRInput; - MultiMRInput mainWorkMultiMRInput; + private MRInputLegacy legacyMRInput; + private MultiMRInput mainWorkMultiMRInput; private final ExecMapperContext execContext; private MapWork mapWork; - List mergeWorkList; - List cacheKeys, dynamicValueCacheKeys; - ObjectCache cache, dynamicValueCache; + private List mergeWorkList; + private final List cacheKeys = new ArrayList<>(); + private final List dynamicValueCacheKeys = new ArrayList<>(); + private final ObjectCache cache, dynamicValueCache; public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); @@ -105,16 +104,12 @@ public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) t dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false, true); execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); - cacheKeys = new ArrayList(); - dynamicValueCacheKeys = new ArrayList(); } private void setLlapOfFragmentId(final ProcessorContext context) { // TODO: could we do this only if the OF is actually used? String attemptId = Converters.createTaskAttemptId(context).toString(); - if (l4j.isDebugEnabled()) { - l4j.debug("Setting the LLAP fragment ID for OF to " + attemptId); - } + LOG.debug("Setting the LLAP fragment ID for OF to {}", attemptId); jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, attemptId); } @@ -131,12 +126,7 @@ void init(MRTaskReporter mrReporter, // create map and fetch operators - mapWork = (MapWork) cache.retrieve(key, new Callable() { - @Override - public Object call() { - return Utilities.getMapWork(jconf); - } - }); + mapWork = cache.retrieve(key, () -> Utilities.getMapWork(jconf)); // TODO HIVE-14042. Cleanup may be required if exiting early. Utilities.setMapWork(jconf, mapWork); @@ -147,7 +137,7 @@ public Object call() { String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); if (prefixes != null) { - mergeWorkList = new ArrayList(); + mergeWorkList = new ArrayList<>(); for (final String prefix : prefixes.split(",")) { if (prefix == null || prefix.isEmpty()) { @@ -159,13 +149,7 @@ public Object call() { checkAbortCondition(); mergeWorkList.add( - (MapWork) cache.retrieve(key, - new Callable() { - @Override - public Object call() { - return Utilities.getMergeWork(jconf, prefix); - } - })); + (MapWork) cache.retrieve(key, () -> Utilities.getMergeWork(jconf, prefix))); } } @@ -189,7 +173,7 @@ public Object call() { createOutputMap(); // Start all the Outputs. for (Entry outputEntry : outputs.entrySet()) { - l4j.debug("Starting Output: " + outputEntry.getKey()); + LOG.debug("Starting Output: " + outputEntry.getKey()); outputEntry.getValue().start(); ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } @@ -233,25 +217,25 @@ public Object call() { // initialize the merge operators first. if (mergeMapOp != null) { mergeMapOp.setConf(mergeMapWork); - l4j.info("Input name is " + mergeMapWork.getName()); + LOG.info("Input name is {}", mergeMapWork.getName()); jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName()); mergeMapOp.initialize(jconf, null); // if there are no files/partitions to read, we need to skip trying to read MultiMRInput multiMRInput = multiMRInputMap.get(mergeMapWork.getName()); boolean skipRead = false; if (multiMRInput == null) { - l4j.info("Multi MR Input for work " + mergeMapWork.getName() + " is null. Skipping read."); + LOG.info("Multi MR Input for work {} is null. Skipping read.", mergeMapWork.getName()); skipRead = true; } else { Collection keyValueReaders = multiMRInput.getKeyValueReaders(); if ((keyValueReaders == null) || (keyValueReaders.isEmpty())) { - l4j.info("Key value readers are null or empty and hence skipping read. " - + "KeyValueReaders = " + keyValueReaders); + LOG.info("Key value readers are null or empty and hence skipping read. " + + "KeyValueReaders = {}", keyValueReaders); skipRead = true; } } if (skipRead) { - List> children = new ArrayList>(); + List> children = new ArrayList<>(); children.addAll(mergeMapOp.getConf().getAliasToWork().values()); // do the same thing as setChildren when there is nothing to read. // the setChildren method initializes the object inspector needed by the operators @@ -286,19 +270,19 @@ public Object call() { // initialize map operator mapOp.setConf(mapWork); - l4j.info("Main input name is " + mapWork.getName()); + LOG.info("Main input name is " + mapWork.getName()); jconf.set(Utilities.INPUT_NAME, mapWork.getName()); mapOp.initialize(jconf, null); checkAbortCondition(); mapOp.setChildren(jconf); mapOp.passExecContext(execContext); - l4j.info(mapOp.dump(0)); + LOG.info(mapOp.dump(0)); // set memory available for operators long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask(); if (mapOp.getConf() != null) { mapOp.getConf().setMaxMemoryAvailable(memoryAvailableToTask); - l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); + LOG.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); } OperatorUtils.setMemoryAvailable(mapOp.getChildOperators(), memoryAvailableToTask); @@ -309,12 +293,7 @@ public Object call() { String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY; // On LLAP dynamic value registry might already be cached. final DynamicValueRegistryTez registryTez = dynamicValueCache.retrieve(valueRegistryKey, - new Callable() { - @Override - public DynamicValueRegistryTez call() { - return new DynamicValueRegistryTez(); - } - }); + () -> new DynamicValueRegistryTez()); dynamicValueCacheKeys.add(valueRegistryKey); RegistryConfTez registryConf = new RegistryConfTez(jconf, mapWork, processorContext, inputs); registryTez.init(registryConf); @@ -322,7 +301,7 @@ public DynamicValueRegistryTez call() { checkAbortCondition(); initializeMapRecordSources(); mapOp.initializeMapOperator(jconf); - if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) { + if ((mergeMapOpList != null) && !mergeMapOpList.isEmpty()) { for (AbstractMapOperator mergeMapOp : mergeMapOpList) { jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName()); // TODO HIVE-14042. abort handling: Handling of mergeMapOp @@ -354,7 +333,7 @@ public DynamicValueRegistryTez call() { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else if (e instanceof InterruptedException) { - l4j.info("Hit an interrupt while initializing MapRecordProcessor. Message={}", + LOG.info("Hit an interrupt while initializing MapRecordProcessor. Message={}", e.getMessage()); throw (InterruptedException) e; } else { @@ -383,7 +362,7 @@ private void initializeMapRecordSources() throws Exception { String inputName = mapOp.getConf().getName(); MultiMRInput multiMRInput = multiMRInputMap.get(inputName); Collection kvReaders = multiMRInput.getKeyValueReaders(); - l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName); + LOG.debug("There are {} key-value readers for input {}", kvReaders.size(), inputName); if (kvReaders.size() > 0) { reader = getKeyValueReader(kvReaders, mapOp); sources[tag].init(jconf, mapOp, reader); @@ -392,10 +371,8 @@ private void initializeMapRecordSources() throws Exception { ((TezContext) MapredContext.get()).setRecordSources(sources); } - @SuppressWarnings("deprecation") private KeyValueReader getKeyValueReader(Collection keyValueReaders, - AbstractMapOperator mapOp) - throws Exception { + AbstractMapOperator mapOp) throws Exception { List kvReaderList = new ArrayList(keyValueReaders); // this sets up the map operator contexts correctly mapOp.initializeContexts(); @@ -436,10 +413,10 @@ public void abort() { // this will abort initializeOp() if (mapOp != null) { - l4j.info("Forwarding abort to mapOp: {} " + mapOp.getName()); + LOG.info("Forwarding abort to mapOp: {} ", mapOp.getName()); mapOp.abort(); } else { - l4j.info("mapOp not setup yet. abort not being forwarded"); + LOG.info("mapOp not setup yet. abort not being forwarded"); } } @@ -450,13 +427,13 @@ void close(){ setAborted(execContext.getIoCxt().getIOExceptions()); } - if (cache != null && cacheKeys != null) { + if (cache != null) { for (String k: cacheKeys) { cache.release(k); } } - if (dynamicValueCache != null && dynamicValueCacheKeys != null) { + if (dynamicValueCache != null) { for (String k: dynamicValueCacheKeys) { dynamicValueCache.release(k); } @@ -491,7 +468,7 @@ void close(){ } catch (Exception e) { if (!isAborted()) { // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); + LOG.error("Hit error while closing operators - failing tree"); throw new RuntimeException("Hive Runtime Error while closing operators", e); } } finally { @@ -505,7 +482,7 @@ private MRInputLegacy getMRInput(Map inputs) throws Except MRInputLegacy theMRInput = null; // start all mr/multi-mr inputs - Set li = new HashSet(); + Set li = new HashSet<>(); for (LogicalInput inp: inputs.values()) { if (inp instanceof MRInputLegacy || inp instanceof MultiMRInput) { inp.start(); @@ -516,7 +493,7 @@ private MRInputLegacy getMRInput(Map inputs) throws Except // MultiMRInput may not. Fix once TEZ-3302 is resolved. processorContext.waitForAllInputsReady(li); - l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray())); + LOG.info("The input names are: {}", String.join(",", inputs.keySet())); for (Entry inp : inputs.entrySet()) { if (inp.getValue() instanceof MRInputLegacy) { if (theMRInput != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index c55a3940c2..13f5f12989 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -135,7 +135,7 @@ public Object call() { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else if (e instanceof InterruptedException) { - l4j.info("Hit an interrupt while initializing MergeFileRecordProcessor. Message={}", + LOG.info("Hit an interrupt while initializing MergeFileRecordProcessor. Message={}", e.getMessage()); throw (InterruptedException) e; } else { @@ -184,7 +184,7 @@ void close() { } catch (Exception e) { if (!isAborted()) { // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); + LOG.error("Hit error while closing operators - failing tree"); throw new RuntimeException("Hive Runtime Error while closing operators", e); } @@ -217,7 +217,7 @@ private boolean processRow(Object key, Object value) { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else { - l4j.error(StringUtils.stringifyException(e)); + LOG.error(StringUtils.stringifyException(e)); throw new RuntimeException(e); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index 86390963c8..6697f62d13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.tez; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.Callable; import org.apache.hadoop.hive.ql.exec.ObjectCache; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; /** * Process input from tez LogicalInput and write output @@ -52,7 +51,7 @@ protected Map outMap; protected final ProcessorContext processorContext; - public static final Logger l4j = LoggerFactory.getLogger(RecordProcessor.class); + private static final Logger LOG = LoggerFactory.getLogger(RecordProcessor.class); protected MRTaskReporter reporter; @@ -78,7 +77,7 @@ void init(MRTaskReporter mrReporter, this.outputs = outputs; checkAbortCondition(); - Utilities.tryLoggingClassPaths(jconf, l4j); + Utilities.tryLoggingClassPaths(jconf, LOG); } /** @@ -91,7 +90,7 @@ void init(MRTaskReporter mrReporter, protected void createOutputMap() { Preconditions.checkState(outMap == null, "Outputs should only be setup once"); - outMap = Maps.newHashMap(); + outMap = new HashMap<>(); for (Entry entry : outputs.entrySet()) { TezKVOutputCollector collector = new TezKVOutputCollector(entry.getValue()); outMap.put(entry.getKey(), collector); @@ -102,22 +101,17 @@ protected void createOutputMap() { ObjectCache cache, List cacheKeys) throws HiveException { String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); if (prefixes != null) { - List mergeWorkList = new ArrayList(); + List mergeWorkList = new ArrayList<>(); for (final String prefix : prefixes.split(",")) { - if (prefix == null || prefix.isEmpty()) { + if (prefix.isEmpty()) { continue; } key = prefix; cacheKeys.add(key); - mergeWorkList.add((BaseWork) cache.retrieve(key, new Callable() { - @Override - public Object call() { - return Utilities.getMergeWork(jconf, prefix); - } - })); + mergeWorkList.add(cache.retrieve(key, () -> Utilities.getMergeWork(jconf, prefix))); } return mergeWorkList; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 152dc98215..cbceb9c32d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -20,12 +20,10 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; -import java.util.concurrent.Callable; import org.apache.hadoop.hive.llap.LlapUtil; import org.slf4j.Logger; @@ -63,21 +61,20 @@ * Just pump the records through the query plan. */ public class ReduceRecordProcessor extends RecordProcessor { + private static final Logger LOG = LoggerFactory.getLogger(ReduceRecordProcessor.class); private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; - private ObjectCache cache, dynamicValueCache; - - public static final Logger l4j = LoggerFactory.getLogger(ReduceRecordProcessor.class); + private final ObjectCache cache, dynamicValueCache; private ReduceWork reduceWork; - List mergeWorkList = null; - List cacheKeys, dynamicValueCacheKeys; + private final List mergeWorkList; + private final List cacheKeys; + private final List dynamicValueCacheKeys = new ArrayList<>(); - private final Map connectOps = - new TreeMap(); - private final Map tagToReducerMap = new HashMap(); + private final Map connectOps = new TreeMap<>(); + private final Map tagToReducerMap = new HashMap<>(); private Operator reducer; @@ -94,22 +91,15 @@ public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY; cacheKeys = Lists.newArrayList(cacheKey); - dynamicValueCacheKeys = new ArrayList(); - reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable() { - @Override - public Object call() { - return Utilities.getReduceWork(jconf); - } - }); + reduceWork = cache.retrieve(cacheKey, () -> Utilities.getReduceWork(jconf)); Utilities.setReduceWork(jconf, reduceWork); mergeWorkList = getMergeWorkList(jconf, cacheKey, queryId, cache, cacheKeys); } @Override - void init( - MRTaskReporter mrReporter, Map inputs, - Map outputs) throws Exception { + void init(MRTaskReporter mrReporter, Map inputs, Map outputs) + throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(mrReporter, inputs, outputs); @@ -118,28 +108,22 @@ void init( // TODO HIVE-14042. Move to using a loop and a timed wait once TEZ-3302 is fixed. checkAbortCondition(); if (shuffleInputs != null) { - l4j.info("Waiting for ShuffleInputs to become ready"); + LOG.info("Waiting for ShuffleInputs to become ready"); processorContext.waitForAllInputsReady(new ArrayList(shuffleInputs)); } connectOps.clear(); ReduceWork redWork = reduceWork; - l4j.info("Main work is " + reduceWork.getName()); + LOG.info("Main work is " + reduceWork.getName()); List workOps = reduceWork.getDummyOps(); - HashSet dummyOps = workOps == null ? null : new HashSet<>(workOps); + HashSet dummyOps = workOps == null ? new HashSet<>() : new HashSet<>(workOps); tagToReducerMap.put(redWork.getTag(), redWork); if (mergeWorkList != null) { for (BaseWork mergeWork : mergeWorkList) { - if (l4j.isDebugEnabled()) { - l4j.debug("Additional work " + mergeWork.getName()); - } + LOG.debug("Additional work {}", mergeWork.getName()); workOps = mergeWork.getDummyOps(); if (workOps != null) { - if (dummyOps == null) { - dummyOps = new HashSet<>(workOps); - } else { - dummyOps.addAll(workOps); - } + dummyOps.addAll(workOps); } ReduceWork mergeReduceWork = (ReduceWork) mergeWork; reducer = mergeReduceWork.getReducer(); @@ -167,19 +151,14 @@ void init( long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask(); if (reducer.getConf() != null) { reducer.getConf().setMaxMemoryAvailable(memoryAvailableToTask); - l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); + LOG.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); } OperatorUtils.setMemoryAvailable(reducer.getChildOperators(), memoryAvailableToTask); // Setup values registry String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY; - DynamicValueRegistryTez registryTez = dynamicValueCache.retrieve(valueRegistryKey, - new Callable() { - @Override - public DynamicValueRegistryTez call() { - return new DynamicValueRegistryTez(); - } - }); + DynamicValueRegistryTez registryTez = + dynamicValueCache.retrieve(valueRegistryKey, () -> new DynamicValueRegistryTez()); dynamicValueCacheKeys.add(valueRegistryKey); RegistryConfTez registryConf = new RegistryConfTez(jconf, reduceWork, processorContext, inputs); registryTez.init(registryConf); @@ -200,15 +179,15 @@ public DynamicValueRegistryTez call() { reducer = redWork.getReducer(); // Check immediately after reducer is assigned, in cae the abort came in during checkAbortCondition(); - initializeSourceForTag(redWork, i, mainWorkOIs, sources, - redWork.getTagToValueDesc().get(0), redWork.getTagToInput().get(0)); + initializeSourceForTag(redWork, i, mainWorkOIs, sources, redWork.getTagToValueDesc().get(0), + redWork.getTagToInput().get(0)); reducer.initializeLocalWork(jconf); } reducer = reduceWork.getReducer(); // Check immediately after reducer is assigned, in cae the abort came in during checkAbortCondition(); ((TezContext) MapredContext.get()).setRecordSources(sources); - reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[bigTablePosition] }); + reducer.initialize(jconf, new ObjectInspector[] {mainWorkOIs[bigTablePosition]}); for (int i : tagToReducerMap.keySet()) { if (i == bigTablePosition) { continue; @@ -217,7 +196,7 @@ public DynamicValueRegistryTez call() { reducer = redWork.getReducer(); // Check immediately after reducer is assigned, in cae the abort came in during checkAbortCondition(); - reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[i] }); + reducer.initialize(jconf, new ObjectInspector[] {mainWorkOIs[i]}); } } checkAbortCondition(); @@ -226,25 +205,21 @@ public DynamicValueRegistryTez call() { // initialize reduce operator tree try { - l4j.info(reducer.dump(0)); + LOG.info(reducer.dump(0)); // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the // dummy parent operators as well. - if (dummyOps != null) { - for (HashTableDummyOperator dummyOp : dummyOps) { - // TODO HIVE-14042. Propagating abort to dummyOps. - dummyOp.initialize(jconf, null); - checkAbortCondition(); - } + for (HashTableDummyOperator dummyOp : dummyOps) { + // TODO HIVE-14042. Propagating abort to dummyOps. + dummyOp.initialize(jconf, null); + checkAbortCondition(); } // set output collector for any reduce sink operators in the pipeline. - List> children = new LinkedList>(); + List> children = new ArrayList<>(); children.add(reducer); - if (dummyOps != null) { - children.addAll(dummyOps); - } + children.addAll(dummyOps); createOutputMap(); OperatorUtils.setChildrenCollector(children, outMap); @@ -258,8 +233,7 @@ public DynamicValueRegistryTez call() { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else if (e instanceof InterruptedException) { - l4j.info("Hit an interrupt while initializing ReduceRecordProcessor. Message={}", - e.getMessage()); + LOG.info("Hit an interrupt while initializing ReduceRecordProcessor. Message={}", e.getMessage()); throw (InterruptedException) e; } else { throw new RuntimeException(redWork.getName() + " operator initialization failed", e); @@ -281,9 +255,8 @@ private void initializeMultipleSources(ReduceWork redWork, int numTags, ObjectIn } } - private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector[] ois, - ReduceRecordSource[] sources, TableDesc valueTableDesc, String inputName) - throws Exception { + private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector[] ois, ReduceRecordSource[] sources, + TableDesc valueTableDesc, String inputName) throws Exception { reducer = redWork.getReducer(); reducer.getParentOperators().clear(); reducer.setParentOperators(null); // clear out any parents as reducer is the root @@ -295,9 +268,8 @@ private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector // Only the big table input source should be vectorized (if applicable) // Note this behavior may have to change if we ever implement a vectorized merge join boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode(); - sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, - valueTableDesc, reader, tag == bigTablePosition, (byte) tag, - redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum(), + sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, valueTableDesc, reader, + tag == bigTablePosition, (byte) tag, redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum(), redWork.getVectorizedTestingReducerBatchSize()); ois[tag] = sources[tag].getObjectInspector(); } @@ -306,7 +278,7 @@ private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector void run() throws Exception { for (Entry outputEntry : outputs.entrySet()) { - l4j.info("Starting Output: " + outputEntry.getKey()); + LOG.info("Starting Output: " + outputEntry.getKey()); if (!isAborted()) { outputEntry.getValue().start(); ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); @@ -327,10 +299,10 @@ public void abort() { super.abort(); if (reducer != null) { - l4j.info("Forwarding abort to reducer: {} " + reducer.getName()); + LOG.info("Forwarding abort to reducer: {} " + reducer.getName()); reducer.abort(); } else { - l4j.info("reducer not setup yet. abort not being forwarded"); + LOG.info("reducer not setup yet. abort not being forwarded"); } } @@ -356,22 +328,22 @@ public void abort() { } @Override - void close(){ - if (cache != null && cacheKeys != null) { + void close() { + if (cache != null) { for (String key : cacheKeys) { cache.release(key); } } - if (dynamicValueCache != null && dynamicValueCacheKeys != null) { - for (String k: dynamicValueCacheKeys) { + if (dynamicValueCache != null) { + for (String k : dynamicValueCacheKeys) { dynamicValueCache.release(k); } } try { if (isAborted()) { - for (ReduceRecordSource rs: sources) { + for (ReduceRecordSource rs : sources) { if (!rs.close()) { setAborted(false); // Preserving the old logic. Hmm... break; @@ -402,9 +374,8 @@ void close(){ } catch (Exception e) { if (!isAborted()) { // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); - throw new RuntimeException( - "Hive Runtime Error while closing operators: " + e.getMessage(), e); + LOG.error("Hit error while closing operators - failing tree"); + throw new RuntimeException("Hive Runtime Error while closing operators: " + e.getMessage(), e); } } finally { Utilities.clearWorkMap(jconf); @@ -418,8 +389,7 @@ private DummyStoreOperator getJoinParentOp(Operator mergeReduceOp) { if (childOp instanceof DummyStoreOperator) { return (DummyStoreOperator) childOp; } else { - throw new IllegalStateException("Was expecting dummy store operator but found: " - + childOp); + throw new IllegalStateException("Was expecting dummy store operator but found: " + childOp); } } else { return getJoinParentOp(childOp);