Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
In EpochTimeScheduler, the `removeReadyTimers` returns the timers that are ready to be fired and also remove them from local book-keeping. We operate on the SetView returned by entrySet() to remove the timers. Since the iterator of the SetView is weakly consistent, there is possibilities in timers surviving the remove and results in duplicate timers being fired.
The following logs illustrate one scenario in which a timer survived the remove and ended up getting fired again along with the timers that were ready subsequently.
Run 1 <!-- List of timers ready to be fired --> removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@6c00212c removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@1c2b5bf4 removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@1322770f removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@8773354e removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@23eaee2a removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@c49a26d8 removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@440d4e3f removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@6347daf8 removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@582fd8b8 removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@bebe094b removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@56e7e7dc removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@f00ad0a7 <!-- Snapshot of timers slotted to removed and returned to run loop --> Removing timers for : [TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@582fd8b8, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@c49a26d8, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@8773354e, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@23eaee2a, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@1c2b5bf4, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@56e7e7dc, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@6c00212c, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@bebe094b, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@f00ad0a7, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@1322770f, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@440d4e3f, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@6347daf8, time='1582080923250'}] <!-- Size of the snapshot returned to run loop --> Timers to be fired: 12 Run 2 <!-- List of timers ready to be fired --> removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@b6e279f2 removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@a020aa78 removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@57e4b472 removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@6ac30e4e removing key: org.apache.beam.runners.samza.runtime.KeyedTimerData@47d4a0d5 <!-- Snapshot of timers slotted to removed and returned to run loop --> Removing timers for : [TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@b6e279f2, time='1582080923441'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@6ac30e4e, time='1582080923441'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@47d4a0d5, time='1582080923441'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@f00ad0a7, time='1582080923250'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@a020aa78, time='1582080923441'}, TimerKey{key=org.apache.beam.runners.samza.runtime.KeyedTimerData@57e4b472, time='1582080923441'}] <!-- Size of the snapshot returned to run loop --> Timers to be fired: 6
Instead of removing all the snapshot from the local ready timers using entrySet().removeAll(), we remove each timer from the map directly.
Attachments
Issue Links
- links to