Uploaded image for project: 'BatchEE'
  1. BatchEE
  2. BATCHEE-138

Failure in Read-Process-Write Loop: Somehow one of the metrics was zero

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 0.5-incubating
    • 0.6
    • jbatch-core
    • None

    Description

      Hi BatchEE-Team,

      I'm not able to stop a chunk processing step via JobOperator.stop() properly. I always get an exception that some metrics are wrong:

      2020-03-10 15:42:34.668 CET ; [5397] ; [AN72] ; [SEVERE] ; an72.org.apache.batchee.container.impl.controller.chunk.ChunkStepController ; Failure in Read-Process-Write Loop
      java.lang.IllegalStateException: Somehow one of the metrics was zero. Read count: 56, Filter count: -1, Write count: 57
       at org.apache.batchee.container.impl.controller.chunk.ChunkStepController.updateNormalMetrics(ChunkStepController.java:650)
       at org.apache.batchee.container.impl.controller.chunk.ChunkStepController.invokeChunk(ChunkStepController.java:613)
       at org.apache.batchee.container.impl.controller.chunk.ChunkStepController.invokeCoreStep(ChunkStepController.java:732)
       at org.apache.batchee.container.impl.controller.BaseStepController.execute(BaseStepController.java:157)
       at org.apache.batchee.container.impl.controller.ExecutionTransitioner.doExecutionLoop(ExecutionTransitioner.java:106)
       at org.apache.batchee.container.impl.controller.JobThreadRootController.originateExecutionOnThread(JobThreadRootController.java:110)
       at org.apache.batchee.container.util.BatchWorkUnit.run(BatchWorkUnit.java:62)

      After investigating the code, I've found the root cause. Anyway, let's understand the request flow first:

      Related class: ChunkStepController

      At the beginning, the invokeChunk method is being called which then calls the readAndProcess method  ...

      private void invokeChunk() {
      ....
       while (true) {
      ..
      final List<Object> chunkToWrite = readAndProcess();
      ...
      }
      

       

      This code increments the read counter at the very beginning although it might be possible that no further read items are available.

       

      private List<Object> readAndProcess() {
       List<Object> chunkToWrite = new ArrayList<Object>();
       Object itemRead;
       Object itemProcessed;
       int readProcessedCount = 0;
      while (true) {
       currentItemStatus = new SingleItemStatus();
       currentChunkStatus.incrementItemsTouchedInCurrentChunk(); //inc read counter
       itemRead = readItem(); // now we call the readItem method
      ..
      }
      }

      Let's assume no items are left. Then the readItem method will mark the currentChunkStatus as finished:

       

      t
      private Object readItem() {
       Object itemRead = null;
      
       ...
      itemRead = readerProxy.readItem(); // this returns null in our case
      ...
      // itemRead == null means we reached the end of
       // the readerProxy "resultset"
       currentChunkStatus.setFinished(itemRead == null); // so we mark the chunk as finished
      

       

      Thus we will return to invokeChunk() where we call updateNormalMetrics(chunkToWrite.size()); after some time:

      Here we decrease the read counter again, when the chunk is marked as finished. Obviously, that's because we increased it before but we did not ready an item anymore:

       

      private void updateNormalMetrics(int writeCount) {
       int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk();
       if (currentChunkStatus.isFinished()) {
       readCount--;
       }
       int filterCount = readCount - writeCount;
      if (readCount < 0 || filterCount < 0 || writeCount < 0) {
       throw new IllegalStateException("Somehow one of the metrics was zero. Read count: " + readCount +
       ", Filter count: " + filterCount + ", Write count: " + writeCount);
       }
      

       

      So far so good.

      Now let's assume the chunk step processing does not complete properly but it gets stopped via Job-Operator invocation. In that case the readAndProcess method marks the chunk as finished as well:

       

      private List<Object> readAndProcess() { 
      ...
      while (true) {
       currentItemStatus = new SingleItemStatus();
       currentChunkStatus.incrementItemsTouchedInCurrentChunk();
       itemRead = readItem();
      
      ...
      // write buffer size reached
       // This will force the current item to finish processing on a stop request
       if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
       currentChunkStatus.setFinished(true);
       }
      ...
      // last record in readerProxy reached 
      if (currentChunkStatus.isFinished()) { //it's not the last record but we stopped the processing
      break; 
      }
      

       

      Now we return back to invokeChunk() where we call updateNormalMetrics(chunkToWrite.size()) again. However, this time - it's not the last record - the decrease operation makes no sense and we fail with the given exception:

       

      private void updateNormalMetrics(int writeCount) {
       int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk();
       if (currentChunkStatus.isFinished()) {
       readCount--;
       }
       int filterCount = readCount - writeCount;
      if (readCount < 0 || filterCount < 0 || writeCount < 0) {
       throw new IllegalStateException("Somehow one of the metrics was zero. Read count: " + readCount +
       ", Filter count: " + filterCount + ", Write count: " + writeCount);
       }
      

       

       

       

       

      Attachments

        Activity

          People

            romain.manni-bucau Romain Manni-Bucau
            chberger Christian Berger
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: