diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 05558d7..9c34ada 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -305,8 +305,8 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t task.setNextPreRoutedEventId(response.getNextPreRoutedEventId()); List taskEvents = null; if (response.getEvents() != null && !response.getEvents().isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId=" + if (LOG.isInfoEnabled()) { + LOG.info("Routing events from heartbeat response to task" + ", currentTaskAttemptId=" + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size() + " fromEventId=" + fromEventId + " nextFromEventId=" + response.getNextFromEventId()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java index b7ed0c1..7d51658 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java @@ -52,7 +52,9 @@ public static MapredContext init(boolean isMap, JobConf jobConf) { HiveConf.getVar(jobConf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ? new TezContext(isMap, jobConf) : new MapredContext(isMap, jobConf); contexts.set(context); - logger.debug("MapredContext initialized."); + if (logger.isDebugEnabled()) { + logger.debug("MapredContext initialized."); + } return context; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index be141c2..7b312a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -503,6 +503,7 @@ protected void initializeChildren(Configuration hconf) throws HiveException { } public void abort() { + LOG.info("Received abort in operator: {}", getName()); abortOp.set(true); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index f4a9cac..dbd0a08 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -113,11 +113,13 @@ void init(MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(mrReporter, inputs, outputs); + checkAbortCondition(); String key = processorContext.getTaskVertexName() + MAP_PLAN_KEY; cacheKeys.add(key); + // create map and fetch operators mapWork = (MapWork) cache.retrieve(key, new Callable() { @Override @@ -125,6 +127,9 @@ public Object call() { return Utilities.getMapWork(jconf); } }); + // TODO: Reviewer: Does stuff like this need to be cleaned up. Assuming this is setting up thread locals. + // I don't see any cleanup if a regular exception is thrown. Assuming the intent is for the next + // executor to overwrite the thread local variable ? Utilities.setMapWork(jconf, mapWork); String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); @@ -139,6 +144,7 @@ public Object call() { key = processorContext.getTaskVertexName() + prefix; cacheKeys.add(key); + checkAbortCondition(); mergeWorkList.add( (MapWork) cache.retrieve(key, new Callable() { @@ -155,6 +161,11 @@ public Object call() { ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); // Update JobConf using MRInput, info like filename comes via this + // TODO: abort handling: This is a potentially blocking call on MRInputs to become ready. + // It would get interrupted, unless the interrupt has been swallowed by some other component. + // MRInput does this immediately. MultiMRInput does not, and waits for events. + // That is potentially prone to issues on interrupts getting swallowed. + checkAbortCondition(); legacyMRInput = getMRInput(inputs); if (legacyMRInput != null) { Configuration updatedConf = legacyMRInput.getConfigUpdates(); @@ -164,6 +175,7 @@ public Object call() { } } } + checkAbortCondition(); createOutputMap(); // Start all the Outputs. @@ -172,6 +184,7 @@ public Object call() { outputEntry.getValue().start(); ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } + checkAbortCondition(); try { @@ -181,6 +194,13 @@ public Object call() { } else { mapOp = new MapOperator(runtimeCtx); } + // Not synchronizing creation of mapOp with an invocation. Check immediately + // after creation in case abort has been set. + // Relying on the regular flow to clean up the actual operator. i.e. If an exception is + // thrown, an attempt will be made to cleanup the op. + // If we are here - exit out via an exception. If we're in the middle of the opeartor.initialize + // call further down, we rely upon op.abort(). + checkAbortCondition(); mapOp.clearConnectedOperators(); mapOp.setExecContext(execContext); @@ -189,6 +209,9 @@ public Object call() { if (mergeWorkList != null) { AbstractMapOperator mergeMapOp = null; for (BaseWork mergeWork : mergeWorkList) { + // TODO: Reviewer: What is this doing ? Looks like additional operators are potentially initialized ? Which speciifc case is this - SMB join ? + // If that's the case - may need to change the patch around a little to ensure that abort reaches these specific operators as well. + // Why the special handling, this should have been part of the hierarchy. MapWork mergeMapWork = (MapWork) mergeWork; if (mergeMapWork.getVectorMode()) { mergeMapOp = new VectorMapOperator(runtimeCtx); @@ -202,6 +225,7 @@ public Object call() { mergeMapOp.setConf(mergeMapWork); l4j.info("Input name is " + mergeMapWork.getName()); jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName()); + // TODO: This particular call can absolutely be blocking, and will not receive any information about an abort. mergeMapOp.initialize(jconf, null); // if there are no files/partitions to read, we need to skip trying to read MultiMRInput multiMRInput = multiMRInputMap.get(mergeMapWork.getName()); @@ -256,17 +280,20 @@ public Object call() { l4j.info("Main input name is " + mapWork.getName()); jconf.set(Utilities.INPUT_NAME, mapWork.getName()); mapOp.initialize(jconf, null); + checkAbortCondition(); mapOp.setChildren(jconf); mapOp.passExecContext(execContext); l4j.info(mapOp.dump(0)); mapOp.initializeLocalWork(jconf); + checkAbortCondition(); initializeMapRecordSources(); mapOp.initializeMapOperator(jconf); if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) { for (AbstractMapOperator mergeMapOp : mergeMapOpList) { jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName()); + // TODO: abort handling: Handling of mergeMapOp mergeMapOp.initializeMapOperator(jconf); } } @@ -279,6 +306,7 @@ public Object call() { if (dummyOps != null) { for (Operator dummyOp : dummyOps){ dummyOp.setExecContext(execContext); + // TODO: Reviewer: What is this doing, and does this require special handling for abort dummyOp.initialize(jconf, null); } } @@ -293,6 +321,10 @@ public Object call() { // will this be true here? // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; + } else if (e instanceof InterruptedException) { + l4j.info("Hit an interrupt while initializing MapRecordProcessor. Message={}", + e.getMessage()); + throw (InterruptedException) e; } else { throw new RuntimeException("Map operator initialization failed", e); } @@ -368,12 +400,16 @@ void run() throws Exception { @Override public void abort() { - // this will stop run() from pushing records + // this will stop run() from pushing records, along with potentially + // blocking initialization. super.abort(); // this will abort initializeOp() if (mapOp != null) { + l4j.info("Forwarding abort to mapOp: {} " + mapOp.getName()); mapOp.abort(); + } else { + l4j.info("mapOp not setup yet. abort not being forwarded"); } } @@ -439,6 +475,8 @@ private MRInputLegacy getMRInput(Map inputs) throws Except li.add(inp); } } + // TODO: Abort handling: Move this to use a loop with a timed wait once TEZ-3302 is done. + // Otherwise, implement the time wait logic within Hive itself. processorContext.waitForAllInputsReady(li); l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray())); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index 6fad405..d77d811 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -73,6 +73,7 @@ public MergeFileRecordProcessor(final JobConf jconf, final ProcessorContext cont void init( MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { + // TODO: Abort handling. Where is this processor used ? perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(mrReporter, inputs, outputs); execContext = new ExecMapperContext(jconf); @@ -134,6 +135,10 @@ public Object call() { // will this be true here? // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; + } else if (e instanceof InterruptedException) { + l4j.info("Hit an interrupt while initializing MergeFileRecordProcessor. Message={}", + e.getMessage()); + throw (InterruptedException) e; } else { throw new RuntimeException("Map operator initialization failed", e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index a373ad6..77c7fa3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -89,6 +89,8 @@ void init(MRTaskReporter mrReporter, isLogInfoEnabled = l4j.isInfoEnabled(); isLogTraceEnabled = l4j.isTraceEnabled(); + checkAbortCondition(); + //log classpaths try { if (l4j.isDebugEnabled()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 415df92..8cda050 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -120,6 +120,9 @@ void init( MapredContext.init(false, new JobConf(jconf)); List shuffleInputs = getShuffleInputs(inputs); + // TODO: abort: Potential block in case an interrupt is swallowed by some compoenent. + // TODO: Make use of TEZ-3302 to get rid of this, or implement the same in Hive itself. + checkAbortCondition(); if (shuffleInputs != null) { l4j.info("Waiting for ShuffleInputs to become ready"); processorContext.waitForAllInputsReady(new ArrayList(shuffleInputs)); @@ -132,6 +135,8 @@ void init( for (BaseWork mergeWork : mergeWorkList) { ReduceWork mergeReduceWork = (ReduceWork) mergeWork; reducer = mergeReduceWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); DummyStoreOperator dummyStoreOp = getJoinParentOp(reducer); connectOps.put(mergeReduceWork.getTag(), dummyStoreOp); tagToReducerMap.put(mergeReduceWork.getTag(), mergeReduceWork); @@ -139,6 +144,7 @@ void init( ((TezContext) MapredContext.get()).setDummyOpsMap(connectOps); } + checkAbortCondition(); bigTablePosition = (byte) reduceWork.getTag(); @@ -147,6 +153,8 @@ void init( ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); int numTags = reduceWork.getTagToValueDesc().size(); reducer = reduceWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); if (numTags > 1) { sources = new ReduceRecordSource[numTags]; mainWorkOIs = new ObjectInspector[numTags]; @@ -160,11 +168,15 @@ void init( for (int i : tagToReducerMap.keySet()) { redWork = tagToReducerMap.get(i); reducer = redWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); initializeSourceForTag(redWork, i, mainWorkOIs, sources, redWork.getTagToValueDesc().get(0), redWork.getTagToInput().get(0)); reducer.initializeLocalWork(jconf); } reducer = reduceWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); ((TezContext) MapredContext.get()).setRecordSources(sources); reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[bigTablePosition] }); for (int i : tagToReducerMap.keySet()) { @@ -173,9 +185,12 @@ void init( } redWork = tagToReducerMap.get(i); reducer = redWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[i] }); } } + checkAbortCondition(); reducer = reduceWork.getReducer(); // initialize reduce operator tree @@ -188,7 +203,9 @@ void init( List dummyOps = redWork.getDummyOps(); if (dummyOps != null) { for (HashTableDummyOperator dummyOp : dummyOps) { + // TODO: Abort handliing. Which op is this initializing, and does it have blocking operations dummyOp.initialize(jconf, null); + checkAbortCondition(); } } @@ -201,6 +218,7 @@ void init( createOutputMap(); OperatorUtils.setChildrenCollector(children, outMap); + checkAbortCondition(); reducer.setReporter(reporter); MapredContext.get().setReporter(reporter); @@ -209,6 +227,10 @@ void init( if (e instanceof OutOfMemoryError) { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; + } else if (e instanceof InterruptedException) { + l4j.info("Hit an interrupt while initializing ReduceRecordProcessor. Message={}", + e.getMessage()); + throw (InterruptedException) e; } else { throw new RuntimeException("Reduce operator initialization failed", e); } @@ -223,6 +245,7 @@ private void initializeMultipleSources(ReduceWork redWork, int numTags, ObjectIn if (redWork.getTagToValueDesc().get(tag) == null) { continue; } + checkAbortCondition(); initializeSourceForTag(redWork, tag, ois, sources, redWork.getTagToValueDesc().get(tag), redWork.getTagToInput().get(tag)); } @@ -270,12 +293,15 @@ void run() throws Exception { @Override public void abort() { - // this will stop run() from pushing records + // this will stop run() from pushing records, along with potentially + // blocking initialization. super.abort(); - // this will abort initializeOp() if (reducer != null) { + l4j.info("Forwarding abort to reducer: {} " + reducer.getName()); reducer.abort(); + } else { + l4j.info("reducer not setup yet. abort not being forwarded"); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index e7b7e43..7ccc0a1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -149,6 +149,14 @@ public void run(Map inputs, Map out } synchronized (this) { + // This check isn't absolutely mandatory, given the aborted check outside of the + // Processor creation. + if (aborted.get()) { + return; + } + // There should be no blocking operation in RecordProcessor creation, + // otherwise the abort operation will not register since they are synchronized on the same + // lock. if (isMap) { rproc = new MapRecordProcessor(jobConf, getContext()); } else { @@ -159,6 +167,7 @@ public void run(Map inputs, Map out if (!aborted.get()) { initializeAndRunProcessor(inputs, outputs); } + // KKK Should this just fall off, without an exception } protected void initializeAndRunProcessor(Map inputs, @@ -168,6 +177,10 @@ protected void initializeAndRunProcessor(Map inputs, try { MRTaskReporter mrReporter = new MRTaskReporter(getContext()); + // Init and run are both potentially long, and blocking operations. Synchronization + // with the 'abort' operation will not work since if they end up blocking on a monitor + // which does not belong to the lock, the abort will end up getting blocked. + // Both of these method invocations need to handle the abort call on their own. rproc.init(mrReporter, inputs, outputs); rproc.run(); @@ -203,13 +216,17 @@ protected void initializeAndRunProcessor(Map inputs, @Override public void abort() { - aborted.set(true); RecordProcessor rProcLocal; synchronized (this) { + LOG.info("Received abort"); + aborted.set(true); rProcLocal = rproc; } if (rProcLocal != null) { + LOG.info("Forwarding abort to RecordProcessor"); rProcLocal.abort(); + } else { + LOG.info("RecordProcessor not yet setup. Abort will be ignored"); } }