Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
2.9.2
-
None
-
None
Description
ShuffleMetrics count for shuffle metrics in NM(shuffle failed/OK or current connections).
But the condition that invoke ShuffleMetrics to do counting is when reduceMap ChannelFuture is successful.
so when net I/O error or other error, ShuffleMetrics won`t count.
following are the code(comments start with '###' are added by me)
ReduceMapFileCount.class(where invoke ShuffleMetrics)
public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { // #### return directly when unsuccessful future.getChannel().close(); return; } int waitCount = this.reduceContext.getMapsToWait().decrementAndGet(); if (waitCount == 0) { metrics.operationComplete(future); // Let the idle timer handler close keep-alive connections if (reduceContext.getKeepAlive()) { ChannelPipeline pipeline = future.getChannel().getPipeline(); TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER); timeoutHandler.setEnabledTimeout(true); } else { future.getChannel().close(); } } else { pipelineFact.getSHUFFLE().sendMap(reduceContext); } }
ShuffleMetrics:
public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { shuffleOutputsOK.incr(); } else { shuffleOutputsFailed.incr(); //### never be invoked } shuffleConnections.decr(); // ### some conditions won`t be invoked }
so should invoke ShuffleMetrics when ReduceMapFileCount error:
in ReduceMapFileCount
public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { metrics.operationComplete(future); // ### invoke when error future.getChannel().close(); return; }