Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
0.5-incubating
-
None
Description
Hi BatchEE-Team,
I'm not able to stop a chunk processing step via JobOperator.stop() properly. I always get an exception that some metrics are wrong:
2020-03-10 15:42:34.668 CET ; [5397] ; [AN72] ; [SEVERE] ; an72.org.apache.batchee.container.impl.controller.chunk.ChunkStepController ; Failure in Read-Process-Write Loop
java.lang.IllegalStateException: Somehow one of the metrics was zero. Read count: 56, Filter count: -1, Write count: 57
at org.apache.batchee.container.impl.controller.chunk.ChunkStepController.updateNormalMetrics(ChunkStepController.java:650)
at org.apache.batchee.container.impl.controller.chunk.ChunkStepController.invokeChunk(ChunkStepController.java:613)
at org.apache.batchee.container.impl.controller.chunk.ChunkStepController.invokeCoreStep(ChunkStepController.java:732)
at org.apache.batchee.container.impl.controller.BaseStepController.execute(BaseStepController.java:157)
at org.apache.batchee.container.impl.controller.ExecutionTransitioner.doExecutionLoop(ExecutionTransitioner.java:106)
at org.apache.batchee.container.impl.controller.JobThreadRootController.originateExecutionOnThread(JobThreadRootController.java:110)
at org.apache.batchee.container.util.BatchWorkUnit.run(BatchWorkUnit.java:62)
After investigating the code, I've found the root cause. Anyway, let's understand the request flow first:
Related class: ChunkStepController
At the beginning, the invokeChunk method is being called which then calls the readAndProcess method ...
private void invokeChunk() { .... while (true) { .. final List<Object> chunkToWrite = readAndProcess(); ... }
This code increments the read counter at the very beginning although it might be possible that no further read items are available.
private List<Object> readAndProcess() { List<Object> chunkToWrite = new ArrayList<Object>(); Object itemRead; Object itemProcessed; int readProcessedCount = 0; while (true) { currentItemStatus = new SingleItemStatus(); currentChunkStatus.incrementItemsTouchedInCurrentChunk(); //inc read counter itemRead = readItem(); // now we call the readItem method .. } }
Let's assume no items are left. Then the readItem method will mark the currentChunkStatus as finished:
t private Object readItem() { Object itemRead = null; ... itemRead = readerProxy.readItem(); // this returns null in our case ... // itemRead == null means we reached the end of // the readerProxy "resultset" currentChunkStatus.setFinished(itemRead == null); // so we mark the chunk as finished
Thus we will return to invokeChunk() where we call updateNormalMetrics(chunkToWrite.size()); after some time:
Here we decrease the read counter again, when the chunk is marked as finished. Obviously, that's because we increased it before but we did not ready an item anymore:
private void updateNormalMetrics(int writeCount) { int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk(); if (currentChunkStatus.isFinished()) { readCount--; } int filterCount = readCount - writeCount; if (readCount < 0 || filterCount < 0 || writeCount < 0) { throw new IllegalStateException("Somehow one of the metrics was zero. Read count: " + readCount + ", Filter count: " + filterCount + ", Write count: " + writeCount); }
So far so good.
Now let's assume the chunk step processing does not complete properly but it gets stopped via Job-Operator invocation. In that case the readAndProcess method marks the chunk as finished as well:
private List<Object> readAndProcess() { ... while (true) { currentItemStatus = new SingleItemStatus(); currentChunkStatus.incrementItemsTouchedInCurrentChunk(); itemRead = readItem(); ... // write buffer size reached // This will force the current item to finish processing on a stop request if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) { currentChunkStatus.setFinished(true); } ... // last record in readerProxy reached if (currentChunkStatus.isFinished()) { //it's not the last record but we stopped the processing break; }
Now we return back to invokeChunk() where we call updateNormalMetrics(chunkToWrite.size()) again. However, this time - it's not the last record - the decrease operation makes no sense and we fail with the given exception:
private void updateNormalMetrics(int writeCount) { int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk(); if (currentChunkStatus.isFinished()) { readCount--; } int filterCount = readCount - writeCount; if (readCount < 0 || filterCount < 0 || writeCount < 0) { throw new IllegalStateException("Somehow one of the metrics was zero. Read count: " + readCount + ", Filter count: " + filterCount + ", Write count: " + writeCount); }