Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.0.0, 2.1.0, 2.2.0
-
None
-
None
Description
From time to time Kinesis IO throws ConcurrentModificationException on taking a checkpoint.
Caused by: java.util.ConcurrentModificationException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:643) at org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) at org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:409) at org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList$Builder.addAll(ImmutableList.java:699) at org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:256) at org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:209) at org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.<init>(KinesisReaderCheckpoint.java:44) at org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.asCurrentStateOf(KinesisReaderCheckpoint.java:49) at org.apache.beam.sdk.io.kinesis.KinesisReader.getCheckpointMark(KinesisReader.java:137) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.snapshotState(UnboundedSourceWrapper.java:379) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:100) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) ... 11 more
What is the issue
org.apache.beam.sdk.io.kinesis.RoundRobin class is using ArrayDeque class which is not thread safe. If ConcurrentLinkedDeque deque is used the problem should be fixed.
Beam 2.3 (master branch)
Kinesis connector have been heavily refactored in master which makes me thinks how we should go about this fix? The org.apache.beam.sdk.io.kinesis.RoundRobin class doesn't exist in master anymore.