Details
-
Bug
-
Status: Resolved
-
Resolution: Fixed
-
2.8.0
-
None
Description
When reading from a JmsIO source, a ConcurrentModificationException can be thrown when checkpoint finalization occurs under heavy load.
For example:
jsonPayload: {
exception: "java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:903)
at java.util.ArrayList$Itr.next(ArrayList.java:853)
at org.apache.beam.sdk.io.jms.JmsCheckpointMark.finalizeCheckpoint(JmsCheckpointMark.java:65)
at com.google.cloud.dataflow.worker.StreamingModeExecutionContext$1.run(StreamingModeExecutionContext.java:379)
at com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:846)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"
job: "2018-09-27_08_55_18-6454085774348718625"
logger: "com.google.cloud.dataflow.worker.StreamingDataflowWorker"
message: "Source checkpoint finalization failed:"
thread: "309"
work: "<nil>"
worker: "test-andrew-092715504-09270855-tkfp-harness-dnmb"
Looking at the JmsCheckpointMark code, it appears that access to the pending message list is unprotected - thus if a thread calls finalizeCheckpoint while a separate processing thread adds more messages to the checkpoint mark list then an exception will be thrown.
Attachments
Issue Links
- links to