Uploaded image for project: 'Apache Nemo'
  1. Apache Nemo
  2. NEMO-125

Fix data loss bug caused by SailfishSchedulingPolicy

    XMLWordPrintableJSON

    Details

      Description

      This bug can be reproduced by changing beam_sample_executor_resources.json to

      [

      { "type": "Transient", "memory_mb": 512, "capacity": 5 }

      ]

      And running mvn -Dtest=WordCountITCase#testSailfish test in /examples/beam

      In this WordCountITCase, 5 map tasks (which outputs more than 10 elements) => RELAY decoder creates 5 byte[]s, one for each map task's output elements => RELAY relays the byte[]s => RELAY encoder writes 5 byte[]s. => REDUCE reads only 5 elements

      The root cause of the bug is SerializedBlock#elementsCount of the RELAY vertex's output block that gets set to the number of byte[] the vertex receives, and not the number of data elements. This leads to the following REDUCE vertex scheduled to the same executor as the problematic block to read only part of the data, as # byte[]  <= # data elements. This is because FileBlock#readPartitions uses a LimitedInputStream that reads as many as SerializedBlock#elementsCount, and not the full data.

      It took me quite an effort to track down root cause of this bug, because this bug manifests only when the REDUCE task is scheduled locally to its input block. The master branch has gotten away with it, as its tests have been deterministically scheduling in a way that avoids triggering the bug. 

      I'd suggest the following solutions to fix this bug and prevent future bugs like this to creep up

      • If possible read file blocks as is, and refrain from using things like LimitedInputStream to read files partially, as that'd make the system more vulnerable to data loss.
      • Run integration tests with a single big executor to check if things still work fine when all data handling is done locally in a single executor
      • Try to share code for local-reads and remote-reads. The only difference between the two should be where the data should be fetched from. After that, and until handing the data over to TaskExecutor, they should use the same code path.
      • If it helps, use a random scheduling policy using a deterministic seed to test corner cases 

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                sanha Sanha Lee
                Reporter:
                johnyangk John Yang
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: