Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.4.0
-
None
Description
./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
It seems the List subscribedPartitionStates was being modified when runFetchLoop iterated the List.
This can happen if, e.g., FlinkKafkaConsumer runs the following code concurrently:
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
java.util.ConcurrentModificationException at java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966) at java.util.LinkedList$ListItr.next(LinkedList.java:888) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
Attachments
Attachments
- Flink9349Test.java
- 6 kB
- Sergey Nuyanzin
Issue Links
- duplicates
-
FLINK-13204 The subscribedPartitionStates should be thread safe.
- Closed
- links to
Activity
It seems synchronization should be added for adding to subscribedPartitionStates and iterating subscribedPartitionStates List.
Hello
I was able to write a test (based on existing) to reproduce this and one more related issue.
the second one is
Caused by: java.lang.NullPointerException at java.util.LinkedList$ListItr.next(LinkedList.java:893) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.doCommitInternalOffsetsToKafka(Kafka09Fetcher.java:228) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.commitInternalOffsetsToKafka(AbstractFetcher.java:293)
as a result of concurrent work with non thread safe subscribedPartitionStates (LinkedList).
from my point of view there could be 2 possible solutions:
- added synchronization as you mentioned
- use threadsafe collection e.g. CopyOnWriteArrayList instead of LinkedList for subscribedPartitionStates
both options pass the test
by the way the code snippet for the test is attached Flink9349Test.java
in the second option no synchronized is required and it might be an option if partitionStates are not frequent otherwise it makes sense to use synchronization
yuzhihong@gmail.com could you please point to more appropriate approach?
This indeed looks like a problem with concurrently accessing the partition state list.
However, I would like to avoid synchronizing the iteration, if possible. That loop is a critical path, adding that synchronization could be harmful for performance. CopyOnWriteArrayList seems like a better approach here; the costly add operations is ok since partition discovery should not happen often anyways.
Sergey Nuyanzin this looks like a blocker bug that we probably should fix ASAP.
Are you working on this already? If yes, do you have an ETA on the PR?
If possible, we should try to get this fix in 1.5.0, which is just around the corner.
both options pass the test
It seems Sergey already tried using CopyOnWriteArrayList.
It would be better if Sergey can provide PR since he has created the test (and tried a fix).
Sergey Nuyanzin your attached file for the test looks good.
We can probably move forward by preparing the PR Are you familiar with the process?
Sergey:
Please add Apache license header to the test when you create the PR.
tzulitai thank you for your comment
started to in progress give me several minutes
trohrmann@apache.org:
Please see the above.
It would be better if the fix can be part of next RC.
I've just talked offline with Till about this issue.
We should try to get a fix for this ASAP, but not block 1.5.0 RCs on this since it was already a bug in 1.4.0.
At least we get a fix in soon, so that in case we have to open yet another RC, it will be included.
GitHub user snuyanzin opened a pull request:
https://github.com/apache/flink/pull/6040
FLINK-9349[Kafka Connector] KafkaConnector Exception while fetching from multiple kafka topics
Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.
Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.
-
- Contribution Checklist
- Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
- Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where FLINK-XXXX should be replaced by the actual issue number. Skip component if you are unsure about which is the best component.
Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
- Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
- Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices).
- Each pull request should address only one issue, not mix up code from multiple issues.
- Each commit in the pull request has a meaningful commit message (including the JIRA id)
- Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
*(The sections below can be removed for hotfixes of typos)*
-
- What is the purpose of the change
(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)
-
- Brief change log
fix synchronization issue
+ test to verify it
-
- Verifying this change
This change added tests and can be verified as follows:
via org.apache.flink.streaming.connectors.kafka.internal.Flink9349Test#testConcurrentPartitionsDiscoveryAndLoopFetching
-
- Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no )
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no )
- The S3 file system connector: ( no)
-
- Documentation
- Does this pull request introduce a new feature? ( no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/snuyanzin/flink FLINK-9349_KafkaConnector_Exception_while_fetching_from_multiple_kafka_topics
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6040.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6040
commit 8de5f37549607460659e171f9c3b48d0090383c0
Author: snuyanzin <snuyanzin@...>
Date: 2018-05-17T16:12:04Z
added test and fix for FLINK-9349 by usage of CopyOnWriteArrayList
commit eee524e2d2a86af5252ed939000c12a2604917e9
Author: snuyanzin <snuyanzin@...>
Date: 2018-05-17T16:35:10Z
fix checkstyle
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r189031751
— Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the
.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Flink9349Test {
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
— End diff –
Likewise, Kafka 08 / 09 / 010 / 011 should all have this test coverage.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r189031075
— Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the
.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Flink9349Test {
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
— End diff –
I think we should have a similar test, but move it to `Kafka09FetcherTest`.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r189031464
— Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the
.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Flink9349Test {
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // to synchronize when the consumer is in its blocking method
+ final OneShotLatch sync = new OneShotLatch();
+
+ // ----- the mock consumer with blocking poll calls ----
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation)
+ }).when(mockConsumer).wakeup();
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceFunction.SourceContext<String> sourceContext = mock(SourceFunction.SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ "task_name",
+ schema,
+ new Properties(),
+ 0L,
+ new UnregisteredMetricsGroup(),
+ new UnregisteredMetricsGroup(),
+ false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ int fetchTasks = 2;
+ final CountDownLatch latch = new CountDownLatch(fetchTasks);
+ ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
+
+ service.submit(new Thread("fetcher runner ") {
+ @Override
+ public void run() {
+ try
catch (Throwable t)
{ + error.set(t); + } + }
+ });
+ for (int i = 0; i < fetchTasks; i++) {
+ service.submit(new Thread("add partitions " + i) {
+
+ @Override
+ public void run() {
+ try {
+ List<KafkaTopicPartition> newPartitions = new ArrayList<>();
+ for (int i = 0; i < 1000; i++)
+ fetcher.addDiscoveredPartitions(newPartitions);
+ latch.countDown();
+ //latch.await();
+ for (int i = 0; i < 100; i++)
+ } catch (Throwable t)
{ + error.set(t); + } + }
+ });
+ }
+
+ service.awaitTermination(1L, TimeUnit.SECONDS);
+
+ // wait until the fetcher has reached the method of interest
+ sync.await();
+
+ // ----- trigger the offset commit -----
— End diff –
I don't think this is required for the scope of interest of this test, is it?
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r189029077
— Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
@@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
- List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
+ List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();-
- End diff –
-
Would be nice to have a comment on why we need to use a `CopyOnWriteArrayList`
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/6040
Thanks for the PR @snuyanzin!
I had some comments, please let me know what you think.
Also, some general contribution tips:
1. I would suggest the title of the PR to be something along the lines of "FLINK-9349 [kafka] Fix ConcurrentModificationException when add discovered partitions". That directly makes it clear what exactly is being fixed.
2. The message of the first commit of the PR should also be appropriately set to be similar to the title (most of the time if it is a 1-commit PR, the title of the PR and the commit message can be identical).
Github user snuyanzin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r189035401
— Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
@@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
- List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
+ List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();-
- End diff –
-
Yes you are right. A question: is it allowed to specify a link on the issue comment where it was decided to use CopyOnWriteArrayList? Or is it better to have explanation in a comment only?
Github user tedyu commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r189036753
— Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
@@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
- List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
+ List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();-
- End diff –
-
explanation can be made with a comment.
No need to link to issue comment.
Github user snuyanzin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r189038623
— Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the
.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Flink9349Test {
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
— End diff –
Do you mean to have such test as a separate method in each Kafka KafkaXYFetcherTest class?
Github user snuyanzin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r189168596
— Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the
.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Flink9349Test {
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // to synchronize when the consumer is in its blocking method
+ final OneShotLatch sync = new OneShotLatch();
+
+ // ----- the mock consumer with blocking poll calls ----
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation)
+ }).when(mockConsumer).wakeup();
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceFunction.SourceContext<String> sourceContext = mock(SourceFunction.SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ "task_name",
+ schema,
+ new Properties(),
+ 0L,
+ new UnregisteredMetricsGroup(),
+ new UnregisteredMetricsGroup(),
+ false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ int fetchTasks = 2;
+ final CountDownLatch latch = new CountDownLatch(fetchTasks);
+ ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
+
+ service.submit(new Thread("fetcher runner ") {
+ @Override
+ public void run() {
+ try
catch (Throwable t)
{ + error.set(t); + } + }
+ });
+ for (int i = 0; i < fetchTasks; i++) {
+ service.submit(new Thread("add partitions " + i) {
+
+ @Override
+ public void run() {
+ try {
+ List<KafkaTopicPartition> newPartitions = new ArrayList<>();
+ for (int i = 0; i < 1000; i++)
+ fetcher.addDiscoveredPartitions(newPartitions);
+ latch.countDown();
+ //latch.await();
+ for (int i = 0; i < 100; i++)
+ } catch (Throwable t)
{ + error.set(t); + } + }
+ });
+ }
+
+ service.awaitTermination(1L, TimeUnit.SECONDS);
+
+ // wait until the fetcher has reached the method of interest
+ sync.await();
+
+ // ----- trigger the offset commit -----
— End diff –
thank you
you are right I removed it and some other useless stuff from this test
Github user snuyanzin commented on the issue:
https://github.com/apache/flink/pull/6040
@tzulitai, @tedyu thnk you for your review, comments and contribution tips
I did updates which includes moving test into AbstractFetcherTest and making it kafka connector version independent
Could you please help me a bit?
Suddenly the travis build failed on YARNSessionCapacitySchedulerITCase (only on flink travis, on my fork it passed several times). It does not look like result of changes as there is nothing related to yarn. Anyway I tried to investigate it. I found several similar issues on jira however they are closed.
Also I downloaded logs mentioned in failed travis job
> Uploading to transfer.sh
https://transfer.sh/JspTz/24547.10.tar.gz
based on them it looks like there was a connectivity issue with one of the ApplicationMaster
as log yarn-tests/container_1526608500321_0007_01_000001/job-manager.log is full of
> 2018-05-18 01:56:49,448 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.ConnectException: Connection refused: travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4/127.0.1.1:43980
2018-05-18 01:56:49,449 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4:43980] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4:43980]] Caused by: [Connection refused: travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4/127.0.1.1:43980]
very strange thing
> Remote connection to [null]
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/6040
@snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be bit flaky, so you can safely ignore that for now. I'll take another look at your changes soon. Thanks!
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190111078
— Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
@@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
}
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+ final TestFetcher<String> fetcher = new TestFetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic assigner */
+ null, /* punctuated assigner */
+ new TestProcessingTimeService(),
+ 10);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
— End diff –
Flink provides a `CheckedThread` utility so you don't have to do this thread error referencing.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190110928
— Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
@@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
}
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
— End diff –
It is unnecessary to use a power mock here. A dummy implementation of a `SourceContext` will be better.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190111239
— Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
@@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
}
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+ final TestFetcher<String> fetcher = new TestFetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic assigner */
+ null, /* punctuated assigner */
+ new TestProcessingTimeService(),
+ 10);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ int fetchTasks = 5;
+ final CountDownLatch latch = new CountDownLatch(fetchTasks);
+ ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
+
+ service.submit(new Thread("fetcher runner") {
+ @Override
+ public void run() {
+ try
catch (Throwable t)
{ + error.set(t); + } + }
+ });
+
+ for (int i = 0; i < fetchTasks; i++) {
+ service.submit(new Thread("add partitions " + i) {
+ @Override
+ public void run() {
+ try {
+ List<KafkaTopicPartition> newPartitions = new ArrayList<>();
+ for (int i = 0; i < 1000; i++)
+ fetcher.addDiscoveredPartitions(newPartitions);
+ latch.countDown();
+ for (int i = 0; i < 100; i++)
+ } catch (Throwable t)
{ + error.set(t); + } + }
+ });
+ }
+
+ service.awaitTermination(1L, TimeUnit.SECONDS);
+
+ // ----- trigger the offset commit -----
— End diff –
We should be able to ignore offset commit triggering in this test
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190114844
— Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
@@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
}
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+ final TestFetcher<String> fetcher = new TestFetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic assigner */
+ null, /* punctuated assigner */
+ new TestProcessingTimeService(),
+ 10);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ int fetchTasks = 5;
+ final CountDownLatch latch = new CountDownLatch(fetchTasks);
+ ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
+
+ service.submit(new Thread("fetcher runner") {
+ @Override
+ public void run() {
+ try {
+ latch.await();
+ fetcher.runFetchLoop();
— End diff –
The sequence here seems a bit odd to me.
I think we should be testing this as follows:
1. Run the fetch loop, and let it be blocked on record emitting (which then should let it be blocked mid-iteration)
2. Add a discovered partition; this should not throw an exception.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190112933
— Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
@@ -507,7 +507,11 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
- List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
+ /**
+ * CopyOnWrite as adding discovered partitions could happen in parallel
+ * with different threads iterating by {@link AbstractFetcher#subscribedPartitionStates}results
+ */-
- End diff –
-
I think we usually don't have Javadoc blocks within methods. A regular comment with `//` would do.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190120209
— Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
@@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
}
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+ final TestFetcher<String> fetcher = new TestFetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic assigner */
+ null, /* punctuated assigner */
+ new TestProcessingTimeService(),
+ 10);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ int fetchTasks = 5;
+ final CountDownLatch latch = new CountDownLatch(fetchTasks);
+ ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
+
+ service.submit(new Thread("fetcher runner") {
+ @Override
+ public void run() {
+ try {
+ latch.await();
+ fetcher.runFetchLoop();
— End diff –
The final `checkedThread.sync()` would always fail with the `ConcurrentModificationException` if the test is designed like this.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190111529
— Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
@@ -416,9 +520,16 @@ protected TestFetcher(
false);
}
+ /**
+ * Emulation of partition's iteration which is required for
+ *
.
+ * @throws Exception
+ */
@Override
public void runFetchLoop() throws Exception {
- throw new UnsupportedOperationException();
+ for (KafkaTopicPartitionState ignored: subscribedPartitionStates()) {
+ Thread.sleep(10L);-
- End diff –
-
This would only let the test fail "occasionally", right?
I would like this to be changed, so that we always have the test failing without the copy on write fix.
We could do this by having a dummy source context that blocks on record emit.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190120134
— Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
@@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
}
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+ final TestFetcher<String> fetcher = new TestFetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic assigner */
+ null, /* punctuated assigner */
+ new TestProcessingTimeService(),
+ 10);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ int fetchTasks = 5;
+ final CountDownLatch latch = new CountDownLatch(fetchTasks);
+ ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
+
+ service.submit(new Thread("fetcher runner") {
+ @Override
+ public void run() {
+ try {
+ latch.await();
+ fetcher.runFetchLoop();
— End diff –
So, IMO, the test should look something like this:
```
final OneShotLatch fetchLoopWaitLatch = new OneShotLatch();
final OneShotLatch stateIterationBlockLatch = new OneShotLatch();
final TestFetcher<String> fetcher = new TestFetcher<>(
sourceContext,
partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
new TestProcessingTimeService(),
10,
fetchLoopWaitLatch,
stateIterationBlockLatch);
// ----- run the fetcher -----
final CheckedThread checkedThread = new CheckedThread() {
@Override
public void go() throws Exception
};
checkedThread.start();
// wait until state iteration begins before adding discovered partitions
fetchLoopWaitLatch.await();
fetcher.addDiscoveredPartitions(Collections.singletonList(testPartition));
stateIterationBlockLatch.trigger();
checkedThread.sync();
```
Merged. Fixed via,
1.6.0 - 049994274c9d4fc07925a7639e4044506b090d10
1.5.1 - bc4a402d09304a21c82299e368442c8a6e4ae427
1.4.3 - 61c44d902d081ad5bf0e1654f62f70567d25fde8
Github user snuyanzin commented on the issue:
https://github.com/apache/flink/pull/6040
@tzulitai thank you for your review and comments
based on your comments I have a question. Could you please clarify it?
You mentioned Flink's `OneShotLatch ` and `CheckedThread ` at the same time in some Kafka connector's tests used `AtomicReference`, `Thread` and etc. (I used one of them as an example while writing my version of the test). Just to be on the sage am I right that `OneShotLatch ` and `CheckedThread ` in tests are more preferable or are there some rules/limitations/whatever?
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/6040
Using `CheckedThread` is more preferable, as it simplifies some of the test code.
But yes, the utility was introduced at a later point in time in Flink, so some parts of the test code might still be using `Thread`s and `AtomicReference`s.
Because of cancelled 1.5.0 RC5 this fix will make it to 1.5.0 RC6. FYI tzulitai you might want to run some additional tests for 1.5.0 RC6 to make sure that everything works as expected, because this bug fix was not included in normal release testing, and a lot of voters will probably just carry over their votes to new RC without testing RC6.
pnowojski thanks for the reminder. Yes, I will test this with RC6.
The root cause analysis, from me, was based on quick inspection of the code.
Vishal, can you attach the complete stack trace if you have it ?
If you can describe your flow (or write unit test) which reproduces the exception, that would help find the root cause.