Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9349

KafkaConnector Exception while fetching from multiple kafka topics

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.4.0
    • 1.4.3, 1.5.0
    • Connectors / Kafka
    • None

    Description

      ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
       
      It seems the List subscribedPartitionStates was being modified when runFetchLoop iterated the List.
      This can happen if, e.g., FlinkKafkaConsumer runs the following code concurrently:
                      kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
       

       java.util.ConcurrentModificationException
      	at java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
      	at java.util.LinkedList$ListItr.next(LinkedList.java:888)
      	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
      

      Attachments

        1. Flink9349Test.java
          6 kB
          Sergey Nuyanzin

        Issue Links

          Activity

            yuzhihong@gmail.com Ted Yu added a comment -

            The root cause analysis, from me, was based on quick inspection of the code.

            Vishal, can you attach the complete stack trace if you have it ?

            If you can describe your flow (or write unit test) which reproduces the exception, that would help find the root cause.

            yuzhihong@gmail.com Ted Yu added a comment - The root cause analysis, from me, was based on quick inspection of the code. Vishal, can you attach the complete stack trace if you have it ? If you can describe your flow (or write unit test) which reproduces the exception, that would help find the root cause.
            yuzhihong@gmail.com Ted Yu added a comment -

            It seems synchronization should be added for adding to subscribedPartitionStates and iterating subscribedPartitionStates List.

            yuzhihong@gmail.com Ted Yu added a comment - It seems synchronization should be added for adding to subscribedPartitionStates and iterating subscribedPartitionStates List.
            Sergey Nuyanzin Sergey Nuyanzin added a comment - - edited

            Hello
            I was able to write a test (based on existing) to reproduce this and one more related issue.
            the second one is

            Caused by: java.lang.NullPointerException
            	at java.util.LinkedList$ListItr.next(LinkedList.java:893)
            	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.doCommitInternalOffsetsToKafka(Kafka09Fetcher.java:228)
            	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.commitInternalOffsetsToKafka(AbstractFetcher.java:293)

            as a result of concurrent work with non thread safe subscribedPartitionStates (LinkedList).
            from my point of view there could be 2 possible solutions:

            1. added synchronization as you mentioned
            2. use threadsafe collection e.g. CopyOnWriteArrayList instead of LinkedList for subscribedPartitionStates

            both options pass the test
            by the way the code snippet for the test is attached Flink9349Test.java

            in the second option no synchronized is required and it might be an option if partitionStates are not frequent otherwise it makes sense to use synchronization
            yuzhihong@gmail.com could you please point to more appropriate approach?

            Sergey Nuyanzin Sergey Nuyanzin added a comment - - edited Hello I was able to write a test (based on existing) to reproduce this and one more related issue. the second one is Caused by: java.lang.NullPointerException at java.util.LinkedList$ListItr.next(LinkedList.java:893) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.doCommitInternalOffsetsToKafka(Kafka09Fetcher.java:228) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.commitInternalOffsetsToKafka(AbstractFetcher.java:293) as a result of concurrent work with non thread safe subscribedPartitionStates (LinkedList). from my point of view there could be 2 possible solutions: added synchronization as you mentioned use threadsafe collection e.g. CopyOnWriteArrayList instead of LinkedList for subscribedPartitionStates both options pass the test by the way the code snippet for the test is attached Flink9349Test.java in the second option no synchronized is required and it might be an option if partitionStates are not frequent otherwise it makes sense to use synchronization yuzhihong@gmail.com could you please point to more appropriate approach?

            This indeed looks like a problem with concurrently accessing the partition state list.

            However, I would like to avoid synchronizing the iteration, if possible. That loop is a critical path, adding that synchronization could be harmful for performance. CopyOnWriteArrayList seems like a better approach here; the costly add operations is ok since partition discovery should not happen often anyways.

            tzulitai Tzu-Li (Gordon) Tai added a comment - This indeed looks like a problem with concurrently accessing the partition state list. However, I would like to avoid synchronizing the iteration, if possible. That loop is a critical path, adding that synchronization could be harmful for performance.  CopyOnWriteArrayList  seems like a better approach here; the costly add operations is ok since partition discovery should not happen often anyways.
            tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

            Sergey Nuyanzin this looks like a blocker bug that we probably should fix ASAP.
            Are you working on this already? If yes, do you have an ETA on the PR?
            If possible, we should try to get this fix in 1.5.0, which is just around the corner.

            tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Sergey Nuyanzin this looks like a blocker bug that we probably should fix ASAP. Are you working on this already? If yes, do you have an ETA on the PR? If possible, we should try to get this fix in 1.5.0, which is just around the corner.
            yuzhihong@gmail.com Ted Yu added a comment -

            both options pass the test

            It seems Sergey already tried using CopyOnWriteArrayList.
            It would be better if Sergey can provide PR since he has created the test (and tried a fix).

            yuzhihong@gmail.com Ted Yu added a comment - both options pass the test It seems Sergey already tried using CopyOnWriteArrayList. It would be better if Sergey can provide PR since he has created the test (and tried a fix).

            Sergey Nuyanzin your attached file for the test looks good.
            We can probably move forward by preparing the PR  Are you familiar with the process?

            tzulitai Tzu-Li (Gordon) Tai added a comment - Sergey Nuyanzin your attached file for the test looks good. We can probably move forward by preparing the PR  Are you familiar with the process?
            yuzhihong@gmail.com Ted Yu added a comment -

            Sergey:
            Please add Apache license header to the test when you create the PR.

            yuzhihong@gmail.com Ted Yu added a comment - Sergey: Please add Apache license header to the test when you create the PR.

            tzulitai thank you for your comment
            started to in progress give me several minutes

            Sergey Nuyanzin Sergey Nuyanzin added a comment - tzulitai thank you for your comment started to in progress give me several minutes
            yuzhihong@gmail.com Ted Yu added a comment -

            trohrmann@apache.org:
            Please see the above.

            It would be better if the fix can be part of next RC.

            yuzhihong@gmail.com Ted Yu added a comment - trohrmann@apache.org : Please see the above. It would be better if the fix can be part of next RC.
            tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

            I've just talked offline with Till about this issue.
            We should try to get a fix for this ASAP, but not block 1.5.0 RCs on this since it was already a bug in 1.4.0.
            At least we get a fix in soon, so that in case we have to open yet another RC, it will be included.

            tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I've just talked offline with Till about this issue. We should try to get a fix for this ASAP, but not block 1.5.0 RCs on this since it was already a bug in 1.4.0. At least we get a fix in soon, so that in case we have to open yet another RC, it will be included.
            githubbot ASF GitHub Bot added a comment -

            GitHub user snuyanzin opened a pull request:

            https://github.com/apache/flink/pull/6040

            FLINK-9349[Kafka Connector] KafkaConnector Exception while fetching from multiple kafka topics

            Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.

            Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.

              1. Contribution Checklist
            • Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where FLINK-XXXX should be replaced by the actual issue number. Skip component if you are unsure about which is the best component.
              Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
            • Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
            • Each pull request should address only one issue, not mix up code from multiple issues.
            • Each commit in the pull request has a meaningful commit message (including the JIRA id)
            • Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.

            *(The sections below can be removed for hotfixes of typos)*

              1. What is the purpose of the change

            (For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)

              1. Brief change log

            fix synchronization issue
            + test to verify it

              1. Verifying this change

            This change added tests and can be verified as follows:
            via org.apache.flink.streaming.connectors.kafka.internal.Flink9349Test#testConcurrentPartitionsDiscoveryAndLoopFetching

              1. Does this pull request potentially affect one of the following parts:
            • Dependencies (does it add or upgrade a dependency): (no)
            • The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
            • The serializers: (no)
            • The runtime per-record code paths (performance sensitive): (no )
            • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no )
            • The S3 file system connector: ( no)
              1. Documentation
            • Does this pull request introduce a new feature? ( no)
            • If yes, how is the feature documented? (not applicable)

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/snuyanzin/flink FLINK-9349_KafkaConnector_Exception_while_fetching_from_multiple_kafka_topics

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/flink/pull/6040.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #6040


            commit 8de5f37549607460659e171f9c3b48d0090383c0
            Author: snuyanzin <snuyanzin@...>
            Date: 2018-05-17T16:12:04Z

            added test and fix for FLINK-9349 by usage of CopyOnWriteArrayList

            commit eee524e2d2a86af5252ed939000c12a2604917e9
            Author: snuyanzin <snuyanzin@...>
            Date: 2018-05-17T16:35:10Z

            fix checkstyle


            githubbot ASF GitHub Bot added a comment - GitHub user snuyanzin opened a pull request: https://github.com/apache/flink/pull/6040 FLINK-9349 [Kafka Connector] KafkaConnector Exception while fetching from multiple kafka topics Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed. Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community. Contribution Checklist Make sure that the pull request corresponds to a [JIRA issue] ( https://issues.apache.org/jira/projects/FLINK/issues ). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. Name the pull request in the form " [FLINK-XXXX] [component] Title of the pull request", where FLINK-XXXX should be replaced by the actual issue number. Skip component if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: ` [hotfix] [docs] Fix typo in event time introduction` or ` [hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide] ( http://flink.apache.org/contribute-code.html#best-practices ). Each pull request should address only one issue, not mix up code from multiple issues. Each commit in the pull request has a meaningful commit message (including the JIRA id) Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. * (The sections below can be removed for hotfixes of typos) * What is the purpose of the change (For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).) Brief change log fix synchronization issue + test to verify it Verifying this change This change added tests and can be verified as follows: via org.apache.flink.streaming.connectors.kafka.internal.Flink9349Test#testConcurrentPartitionsDiscoveryAndLoopFetching Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (no) The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) The serializers: (no) The runtime per-record code paths (performance sensitive): (no ) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no ) The S3 file system connector: ( no) Documentation Does this pull request introduce a new feature? ( no) If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/snuyanzin/flink FLINK-9349 _KafkaConnector_Exception_while_fetching_from_multiple_kafka_topics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6040.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6040 commit 8de5f37549607460659e171f9c3b48d0090383c0 Author: snuyanzin <snuyanzin@...> Date: 2018-05-17T16:12:04Z added test and fix for FLINK-9349 by usage of CopyOnWriteArrayList commit eee524e2d2a86af5252ed939000c12a2604917e9 Author: snuyanzin <snuyanzin@...> Date: 2018-05-17T16:35:10Z fix checkstyle
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r189031751

            — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
            @@ -0,0 +1,207 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.streaming.connectors.kafka.internal;
            +
            +import org.apache.flink.api.common.serialization.SimpleStringSchema;
            +import org.apache.flink.core.testutils.MultiShotLatch;
            +import org.apache.flink.core.testutils.OneShotLatch;
            +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
            +import org.apache.flink.streaming.api.functions.source.SourceFunction;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
            +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
            +
            +import org.apache.kafka.clients.consumer.ConsumerRecords;
            +import org.apache.kafka.clients.consumer.KafkaConsumer;
            +import org.junit.Test;
            +import org.junit.runner.RunWith;
            +import org.mockito.invocation.InvocationOnMock;
            +import org.mockito.stubbing.Answer;
            +import org.powermock.core.classloader.annotations.PrepareForTest;
            +import org.powermock.modules.junit4.PowerMockRunner;
            +
            +import java.util.ArrayList;
            +import java.util.Collections;
            +import java.util.HashMap;
            +import java.util.List;
            +import java.util.Map;
            +import java.util.Properties;
            +import java.util.concurrent.CountDownLatch;
            +import java.util.concurrent.ExecutorService;
            +import java.util.concurrent.Executors;
            +import java.util.concurrent.TimeUnit;
            +import java.util.concurrent.atomic.AtomicReference;
            +
            +import static org.junit.Assert.assertFalse;
            +import static org.mockito.Mockito.anyLong;
            +import static org.powermock.api.mockito.PowerMockito.doAnswer;
            +import static org.powermock.api.mockito.PowerMockito.mock;
            +import static org.powermock.api.mockito.PowerMockito.when;
            +import static org.powermock.api.mockito.PowerMockito.whenNew;
            +
            +/**
            + * Unit tests for the

            {@link Flink9349Test}

            .
            + */
            +@RunWith(PowerMockRunner.class)
            +@PrepareForTest(KafkaConsumerThread.class)
            +public class Flink9349Test {
            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            — End diff –

            Likewise, Kafka 08 / 09 / 010 / 011 should all have this test coverage.

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189031751 — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Flink9349Test} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Flink9349Test { + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { — End diff – Likewise, Kafka 08 / 09 / 010 / 011 should all have this test coverage.
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r189031075

            — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
            @@ -0,0 +1,207 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.streaming.connectors.kafka.internal;
            +
            +import org.apache.flink.api.common.serialization.SimpleStringSchema;
            +import org.apache.flink.core.testutils.MultiShotLatch;
            +import org.apache.flink.core.testutils.OneShotLatch;
            +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
            +import org.apache.flink.streaming.api.functions.source.SourceFunction;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
            +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
            +
            +import org.apache.kafka.clients.consumer.ConsumerRecords;
            +import org.apache.kafka.clients.consumer.KafkaConsumer;
            +import org.junit.Test;
            +import org.junit.runner.RunWith;
            +import org.mockito.invocation.InvocationOnMock;
            +import org.mockito.stubbing.Answer;
            +import org.powermock.core.classloader.annotations.PrepareForTest;
            +import org.powermock.modules.junit4.PowerMockRunner;
            +
            +import java.util.ArrayList;
            +import java.util.Collections;
            +import java.util.HashMap;
            +import java.util.List;
            +import java.util.Map;
            +import java.util.Properties;
            +import java.util.concurrent.CountDownLatch;
            +import java.util.concurrent.ExecutorService;
            +import java.util.concurrent.Executors;
            +import java.util.concurrent.TimeUnit;
            +import java.util.concurrent.atomic.AtomicReference;
            +
            +import static org.junit.Assert.assertFalse;
            +import static org.mockito.Mockito.anyLong;
            +import static org.powermock.api.mockito.PowerMockito.doAnswer;
            +import static org.powermock.api.mockito.PowerMockito.mock;
            +import static org.powermock.api.mockito.PowerMockito.when;
            +import static org.powermock.api.mockito.PowerMockito.whenNew;
            +
            +/**
            + * Unit tests for the

            {@link Flink9349Test}

            .
            + */
            +@RunWith(PowerMockRunner.class)
            +@PrepareForTest(KafkaConsumerThread.class)
            +public class Flink9349Test {
            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            — End diff –

            I think we should have a similar test, but move it to `Kafka09FetcherTest`.

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189031075 — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Flink9349Test} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Flink9349Test { + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { — End diff – I think we should have a similar test, but move it to `Kafka09FetcherTest`.
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r189031464

            — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
            @@ -0,0 +1,207 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.streaming.connectors.kafka.internal;
            +
            +import org.apache.flink.api.common.serialization.SimpleStringSchema;
            +import org.apache.flink.core.testutils.MultiShotLatch;
            +import org.apache.flink.core.testutils.OneShotLatch;
            +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
            +import org.apache.flink.streaming.api.functions.source.SourceFunction;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
            +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
            +
            +import org.apache.kafka.clients.consumer.ConsumerRecords;
            +import org.apache.kafka.clients.consumer.KafkaConsumer;
            +import org.junit.Test;
            +import org.junit.runner.RunWith;
            +import org.mockito.invocation.InvocationOnMock;
            +import org.mockito.stubbing.Answer;
            +import org.powermock.core.classloader.annotations.PrepareForTest;
            +import org.powermock.modules.junit4.PowerMockRunner;
            +
            +import java.util.ArrayList;
            +import java.util.Collections;
            +import java.util.HashMap;
            +import java.util.List;
            +import java.util.Map;
            +import java.util.Properties;
            +import java.util.concurrent.CountDownLatch;
            +import java.util.concurrent.ExecutorService;
            +import java.util.concurrent.Executors;
            +import java.util.concurrent.TimeUnit;
            +import java.util.concurrent.atomic.AtomicReference;
            +
            +import static org.junit.Assert.assertFalse;
            +import static org.mockito.Mockito.anyLong;
            +import static org.powermock.api.mockito.PowerMockito.doAnswer;
            +import static org.powermock.api.mockito.PowerMockito.mock;
            +import static org.powermock.api.mockito.PowerMockito.when;
            +import static org.powermock.api.mockito.PowerMockito.whenNew;
            +
            +/**
            + * Unit tests for the

            {@link Flink9349Test}

            .
            + */
            +@RunWith(PowerMockRunner.class)
            +@PrepareForTest(KafkaConsumerThread.class)
            +public class Flink9349Test {
            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            +
            + // test data
            + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
            + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
            + testCommitData.put(testPartition, 11L);
            +
            + // to synchronize when the consumer is in its blocking method
            + final OneShotLatch sync = new OneShotLatch();
            +
            + // ----- the mock consumer with blocking poll calls ----
            + final MultiShotLatch blockerLatch = new MultiShotLatch();
            +
            + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
            + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
            +
            + @Override
            + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException

            { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + }

            + });
            +
            + doAnswer(new Answer<Void>() {
            + @Override
            + public Void answer(InvocationOnMock invocation)

            { + blockerLatch.trigger(); + return null; + }

            + }).when(mockConsumer).wakeup();
            +
            + // make sure the fetcher creates the mock consumer
            + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
            +
            + // ----- create the test fetcher -----
            +
            + @SuppressWarnings("unchecked")
            + SourceFunction.SourceContext<String> sourceContext = mock(SourceFunction.SourceContext.class);
            + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
            + Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
            + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
            +
            + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
            + sourceContext,
            + partitionsWithInitialOffsets,
            + null, /* periodic watermark extractor */
            + null, /* punctuated watermark extractor */
            + new TestProcessingTimeService(),
            + 10, /* watermark interval */
            + this.getClass().getClassLoader(),
            + "task_name",
            + schema,
            + new Properties(),
            + 0L,
            + new UnregisteredMetricsGroup(),
            + new UnregisteredMetricsGroup(),
            + false);
            +
            + // ----- run the fetcher -----
            +
            + final AtomicReference<Throwable> error = new AtomicReference<>();
            + int fetchTasks = 2;
            + final CountDownLatch latch = new CountDownLatch(fetchTasks);
            + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
            +
            + service.submit(new Thread("fetcher runner ") {
            + @Override
            + public void run() {
            + try

            { + latch.await(); + fetcher.runFetchLoop(); + }

            catch (Throwable t)

            { + error.set(t); + }

            + }
            + });
            + for (int i = 0; i < fetchTasks; i++) {
            + service.submit(new Thread("add partitions " + i) {
            +
            + @Override
            + public void run() {
            + try {
            + List<KafkaTopicPartition> newPartitions = new ArrayList<>();
            + for (int i = 0; i < 1000; i++)

            { + newPartitions.add(testPartition); + }

            + fetcher.addDiscoveredPartitions(newPartitions);
            + latch.countDown();
            + //latch.await();
            + for (int i = 0; i < 100; i++)

            { + fetcher.addDiscoveredPartitions(newPartitions); + Thread.sleep(1L); + }

            + } catch (Throwable t)

            { + error.set(t); + }

            + }
            + });
            + }
            +
            + service.awaitTermination(1L, TimeUnit.SECONDS);
            +
            + // wait until the fetcher has reached the method of interest
            + sync.await();
            +
            + // ----- trigger the offset commit -----
            — End diff –

            I don't think this is required for the scope of interest of this test, is it?

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189031464 — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Flink9349Test} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Flink9349Test { + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceFunction.SourceContext<String> sourceContext = mock(SourceFunction.SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + "task_name", + schema, + new Properties(), + 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), + false); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + int fetchTasks = 2; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner ") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }); + for (int i = 0; i < fetchTasks; i++) { + service.submit(new Thread("add partitions " + i) { + + @Override + public void run() { + try { + List<KafkaTopicPartition> newPartitions = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + newPartitions.add(testPartition); + } + fetcher.addDiscoveredPartitions(newPartitions); + latch.countDown(); + //latch.await(); + for (int i = 0; i < 100; i++) { + fetcher.addDiscoveredPartitions(newPartitions); + Thread.sleep(1L); + } + } catch (Throwable t) { + error.set(t); + } + } + }); + } + + service.awaitTermination(1L, TimeUnit.SECONDS); + + // wait until the fetcher has reached the method of interest + sync.await(); + + // ----- trigger the offset commit ----- — End diff – I don't think this is required for the scope of interest of this test, is it?
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r189029077

            — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
            @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {

            • List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
              + List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();
                • End diff –

            Would be nice to have a comment on why we need to use a `CopyOnWriteArrayList`

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189029077 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>(); + List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>(); End diff – Would be nice to have a comment on why we need to use a `CopyOnWriteArrayList`
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on the issue:

            https://github.com/apache/flink/pull/6040

            Thanks for the PR @snuyanzin!
            I had some comments, please let me know what you think.

            Also, some general contribution tips:
            1. I would suggest the title of the PR to be something along the lines of "FLINK-9349 [kafka] Fix ConcurrentModificationException when add discovered partitions". That directly makes it clear what exactly is being fixed.
            2. The message of the first commit of the PR should also be appropriately set to be similar to the title (most of the time if it is a 1-commit PR, the title of the PR and the commit message can be identical).

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6040 Thanks for the PR @snuyanzin! I had some comments, please let me know what you think. Also, some general contribution tips: 1. I would suggest the title of the PR to be something along the lines of " FLINK-9349 [kafka] Fix ConcurrentModificationException when add discovered partitions". That directly makes it clear what exactly is being fixed. 2. The message of the first commit of the PR should also be appropriately set to be similar to the title (most of the time if it is a 1-commit PR, the title of the PR and the commit message can be identical).
            githubbot ASF GitHub Bot added a comment -

            Github user snuyanzin commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r189035401

            — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
            @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {

            • List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
              + List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();
                • End diff –

            Yes you are right. A question: is it allowed to specify a link on the issue comment where it was decided to use CopyOnWriteArrayList? Or is it better to have explanation in a comment only?

            githubbot ASF GitHub Bot added a comment - Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189035401 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>(); + List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>(); End diff – Yes you are right. A question: is it allowed to specify a link on the issue comment where it was decided to use CopyOnWriteArrayList? Or is it better to have explanation in a comment only?
            githubbot ASF GitHub Bot added a comment -

            Github user tedyu commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r189036753

            — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
            @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {

            • List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
              + List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>();
                • End diff –

            explanation can be made with a comment.
            No need to link to issue comment.

            githubbot ASF GitHub Bot added a comment - Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189036753 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>(); + List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>(); End diff – explanation can be made with a comment. No need to link to issue comment.
            githubbot ASF GitHub Bot added a comment -

            Github user snuyanzin commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r189038623

            — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
            @@ -0,0 +1,207 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.streaming.connectors.kafka.internal;
            +
            +import org.apache.flink.api.common.serialization.SimpleStringSchema;
            +import org.apache.flink.core.testutils.MultiShotLatch;
            +import org.apache.flink.core.testutils.OneShotLatch;
            +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
            +import org.apache.flink.streaming.api.functions.source.SourceFunction;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
            +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
            +
            +import org.apache.kafka.clients.consumer.ConsumerRecords;
            +import org.apache.kafka.clients.consumer.KafkaConsumer;
            +import org.junit.Test;
            +import org.junit.runner.RunWith;
            +import org.mockito.invocation.InvocationOnMock;
            +import org.mockito.stubbing.Answer;
            +import org.powermock.core.classloader.annotations.PrepareForTest;
            +import org.powermock.modules.junit4.PowerMockRunner;
            +
            +import java.util.ArrayList;
            +import java.util.Collections;
            +import java.util.HashMap;
            +import java.util.List;
            +import java.util.Map;
            +import java.util.Properties;
            +import java.util.concurrent.CountDownLatch;
            +import java.util.concurrent.ExecutorService;
            +import java.util.concurrent.Executors;
            +import java.util.concurrent.TimeUnit;
            +import java.util.concurrent.atomic.AtomicReference;
            +
            +import static org.junit.Assert.assertFalse;
            +import static org.mockito.Mockito.anyLong;
            +import static org.powermock.api.mockito.PowerMockito.doAnswer;
            +import static org.powermock.api.mockito.PowerMockito.mock;
            +import static org.powermock.api.mockito.PowerMockito.when;
            +import static org.powermock.api.mockito.PowerMockito.whenNew;
            +
            +/**
            + * Unit tests for the

            {@link Flink9349Test}

            .
            + */
            +@RunWith(PowerMockRunner.class)
            +@PrepareForTest(KafkaConsumerThread.class)
            +public class Flink9349Test {
            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            — End diff –

            Do you mean to have such test as a separate method in each Kafka KafkaXYFetcherTest class?

            githubbot ASF GitHub Bot added a comment - Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189038623 — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Flink9349Test} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Flink9349Test { + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { — End diff – Do you mean to have such test as a separate method in each Kafka KafkaXYFetcherTest class?
            githubbot ASF GitHub Bot added a comment -

            Github user snuyanzin commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r189168596

            — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java —
            @@ -0,0 +1,207 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.streaming.connectors.kafka.internal;
            +
            +import org.apache.flink.api.common.serialization.SimpleStringSchema;
            +import org.apache.flink.core.testutils.MultiShotLatch;
            +import org.apache.flink.core.testutils.OneShotLatch;
            +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
            +import org.apache.flink.streaming.api.functions.source.SourceFunction;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
            +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
            +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
            +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
            +
            +import org.apache.kafka.clients.consumer.ConsumerRecords;
            +import org.apache.kafka.clients.consumer.KafkaConsumer;
            +import org.junit.Test;
            +import org.junit.runner.RunWith;
            +import org.mockito.invocation.InvocationOnMock;
            +import org.mockito.stubbing.Answer;
            +import org.powermock.core.classloader.annotations.PrepareForTest;
            +import org.powermock.modules.junit4.PowerMockRunner;
            +
            +import java.util.ArrayList;
            +import java.util.Collections;
            +import java.util.HashMap;
            +import java.util.List;
            +import java.util.Map;
            +import java.util.Properties;
            +import java.util.concurrent.CountDownLatch;
            +import java.util.concurrent.ExecutorService;
            +import java.util.concurrent.Executors;
            +import java.util.concurrent.TimeUnit;
            +import java.util.concurrent.atomic.AtomicReference;
            +
            +import static org.junit.Assert.assertFalse;
            +import static org.mockito.Mockito.anyLong;
            +import static org.powermock.api.mockito.PowerMockito.doAnswer;
            +import static org.powermock.api.mockito.PowerMockito.mock;
            +import static org.powermock.api.mockito.PowerMockito.when;
            +import static org.powermock.api.mockito.PowerMockito.whenNew;
            +
            +/**
            + * Unit tests for the

            {@link Flink9349Test}

            .
            + */
            +@RunWith(PowerMockRunner.class)
            +@PrepareForTest(KafkaConsumerThread.class)
            +public class Flink9349Test {
            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            +
            + // test data
            + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
            + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
            + testCommitData.put(testPartition, 11L);
            +
            + // to synchronize when the consumer is in its blocking method
            + final OneShotLatch sync = new OneShotLatch();
            +
            + // ----- the mock consumer with blocking poll calls ----
            + final MultiShotLatch blockerLatch = new MultiShotLatch();
            +
            + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
            + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
            +
            + @Override
            + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException

            { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + }

            + });
            +
            + doAnswer(new Answer<Void>() {
            + @Override
            + public Void answer(InvocationOnMock invocation)

            { + blockerLatch.trigger(); + return null; + }

            + }).when(mockConsumer).wakeup();
            +
            + // make sure the fetcher creates the mock consumer
            + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
            +
            + // ----- create the test fetcher -----
            +
            + @SuppressWarnings("unchecked")
            + SourceFunction.SourceContext<String> sourceContext = mock(SourceFunction.SourceContext.class);
            + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
            + Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
            + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
            +
            + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
            + sourceContext,
            + partitionsWithInitialOffsets,
            + null, /* periodic watermark extractor */
            + null, /* punctuated watermark extractor */
            + new TestProcessingTimeService(),
            + 10, /* watermark interval */
            + this.getClass().getClassLoader(),
            + "task_name",
            + schema,
            + new Properties(),
            + 0L,
            + new UnregisteredMetricsGroup(),
            + new UnregisteredMetricsGroup(),
            + false);
            +
            + // ----- run the fetcher -----
            +
            + final AtomicReference<Throwable> error = new AtomicReference<>();
            + int fetchTasks = 2;
            + final CountDownLatch latch = new CountDownLatch(fetchTasks);
            + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
            +
            + service.submit(new Thread("fetcher runner ") {
            + @Override
            + public void run() {
            + try

            { + latch.await(); + fetcher.runFetchLoop(); + }

            catch (Throwable t)

            { + error.set(t); + }

            + }
            + });
            + for (int i = 0; i < fetchTasks; i++) {
            + service.submit(new Thread("add partitions " + i) {
            +
            + @Override
            + public void run() {
            + try {
            + List<KafkaTopicPartition> newPartitions = new ArrayList<>();
            + for (int i = 0; i < 1000; i++)

            { + newPartitions.add(testPartition); + }

            + fetcher.addDiscoveredPartitions(newPartitions);
            + latch.countDown();
            + //latch.await();
            + for (int i = 0; i < 100; i++)

            { + fetcher.addDiscoveredPartitions(newPartitions); + Thread.sleep(1L); + }

            + } catch (Throwable t)

            { + error.set(t); + }

            + }
            + });
            + }
            +
            + service.awaitTermination(1L, TimeUnit.SECONDS);
            +
            + // wait until the fetcher has reached the method of interest
            + sync.await();
            +
            + // ----- trigger the offset commit -----
            — End diff –

            thank you
            you are right I removed it and some other useless stuff from this test

            githubbot ASF GitHub Bot added a comment - Github user snuyanzin commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189168596 — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Flink9349Test} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Flink9349Test { + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceFunction.SourceContext<String> sourceContext = mock(SourceFunction.SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + "task_name", + schema, + new Properties(), + 0L, + new UnregisteredMetricsGroup(), + new UnregisteredMetricsGroup(), + false); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + int fetchTasks = 2; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner ") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }); + for (int i = 0; i < fetchTasks; i++) { + service.submit(new Thread("add partitions " + i) { + + @Override + public void run() { + try { + List<KafkaTopicPartition> newPartitions = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + newPartitions.add(testPartition); + } + fetcher.addDiscoveredPartitions(newPartitions); + latch.countDown(); + //latch.await(); + for (int i = 0; i < 100; i++) { + fetcher.addDiscoveredPartitions(newPartitions); + Thread.sleep(1L); + } + } catch (Throwable t) { + error.set(t); + } + } + }); + } + + service.awaitTermination(1L, TimeUnit.SECONDS); + + // wait until the fetcher has reached the method of interest + sync.await(); + + // ----- trigger the offset commit ----- — End diff – thank you you are right I removed it and some other useless stuff from this test
            githubbot ASF GitHub Bot added a comment -

            Github user snuyanzin commented on the issue:

            https://github.com/apache/flink/pull/6040

            @tzulitai, @tedyu thnk you for your review, comments and contribution tips
            I did updates which includes moving test into AbstractFetcherTest and making it kafka connector version independent

            Could you please help me a bit?
            Suddenly the travis build failed on YARNSessionCapacitySchedulerITCase (only on flink travis, on my fork it passed several times). It does not look like result of changes as there is nothing related to yarn. Anyway I tried to investigate it. I found several similar issues on jira however they are closed.

            Also I downloaded logs mentioned in failed travis job

            > Uploading to transfer.sh
            https://transfer.sh/JspTz/24547.10.tar.gz

            based on them it looks like there was a connectivity issue with one of the ApplicationMaster
            as log yarn-tests/container_1526608500321_0007_01_000001/job-manager.log is full of
            > 2018-05-18 01:56:49,448 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.ConnectException: Connection refused: travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4/127.0.1.1:43980
            2018-05-18 01:56:49,449 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4:43980] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4:43980]] Caused by: [Connection refused: travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4/127.0.1.1:43980]

            very strange thing

            > Remote connection to [null]

            githubbot ASF GitHub Bot added a comment - Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6040 @tzulitai, @tedyu thnk you for your review, comments and contribution tips I did updates which includes moving test into AbstractFetcherTest and making it kafka connector version independent Could you please help me a bit? Suddenly the travis build failed on YARNSessionCapacitySchedulerITCase (only on flink travis, on my fork it passed several times). It does not look like result of changes as there is nothing related to yarn. Anyway I tried to investigate it. I found several similar issues on jira however they are closed. Also I downloaded logs mentioned in failed travis job > Uploading to transfer.sh https://transfer.sh/JspTz/24547.10.tar.gz based on them it looks like there was a connectivity issue with one of the ApplicationMaster as log yarn-tests/container_1526608500321_0007_01_000001/job-manager.log is full of > 2018-05-18 01:56:49,448 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with java.net.ConnectException: Connection refused: travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4/127.0.1.1:43980 2018-05-18 01:56:49,449 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4:43980] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4:43980] ] Caused by: [Connection refused: travis-job-2a2afdc5-7bf8-4597-946e-16551a5ebbc4/127.0.1.1:43980] very strange thing > Remote connection to [null]
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on the issue:

            https://github.com/apache/flink/pull/6040

            @snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be bit flaky, so you can safely ignore that for now. I'll take another look at your changes soon. Thanks!

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6040 @snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be bit flaky, so you can safely ignore that for now. I'll take another look at your changes soon. Thanks!
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r190111078

            — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
            @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
            assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
            }

            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            + // test data
            + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
            +
            + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
            + testCommitData.put(testPartition, 11L);
            +
            + // ----- create the test fetcher -----
            +
            + @SuppressWarnings("unchecked")
            + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
            + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
            + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
            +
            + final TestFetcher<String> fetcher = new TestFetcher<>(
            + sourceContext,
            + partitionsWithInitialOffsets,
            + null, /* periodic assigner */
            + null, /* punctuated assigner */
            + new TestProcessingTimeService(),
            + 10);
            +
            + // ----- run the fetcher -----
            +
            + final AtomicReference<Throwable> error = new AtomicReference<>();
            — End diff –

            Flink provides a `CheckedThread` utility so you don't have to do this thread error referencing.

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190111078 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java — @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher<String> fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); — End diff – Flink provides a `CheckedThread` utility so you don't have to do this thread error referencing.
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r190110928

            — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
            @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
            assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
            }

            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            + // test data
            + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
            +
            + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
            + testCommitData.put(testPartition, 11L);
            +
            + // ----- create the test fetcher -----
            +
            + @SuppressWarnings("unchecked")
            + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
            — End diff –

            It is unnecessary to use a power mock here. A dummy implementation of a `SourceContext` will be better.

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190110928 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java — @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class); — End diff – It is unnecessary to use a power mock here. A dummy implementation of a `SourceContext` will be better.
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r190111239

            — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
            @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
            assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
            }

            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            + // test data
            + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
            +
            + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
            + testCommitData.put(testPartition, 11L);
            +
            + // ----- create the test fetcher -----
            +
            + @SuppressWarnings("unchecked")
            + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
            + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
            + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
            +
            + final TestFetcher<String> fetcher = new TestFetcher<>(
            + sourceContext,
            + partitionsWithInitialOffsets,
            + null, /* periodic assigner */
            + null, /* punctuated assigner */
            + new TestProcessingTimeService(),
            + 10);
            +
            + // ----- run the fetcher -----
            +
            + final AtomicReference<Throwable> error = new AtomicReference<>();
            + int fetchTasks = 5;
            + final CountDownLatch latch = new CountDownLatch(fetchTasks);
            + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
            +
            + service.submit(new Thread("fetcher runner") {
            + @Override
            + public void run() {
            + try

            { + latch.await(); + fetcher.runFetchLoop(); + }

            catch (Throwable t)

            { + error.set(t); + }

            + }
            + });
            +
            + for (int i = 0; i < fetchTasks; i++) {
            + service.submit(new Thread("add partitions " + i) {
            + @Override
            + public void run() {
            + try {
            + List<KafkaTopicPartition> newPartitions = new ArrayList<>();
            + for (int i = 0; i < 1000; i++)

            { + newPartitions.add(testPartition); + }

            + fetcher.addDiscoveredPartitions(newPartitions);
            + latch.countDown();
            + for (int i = 0; i < 100; i++)

            { + fetcher.addDiscoveredPartitions(newPartitions); + Thread.sleep(1L); + }

            + } catch (Throwable t)

            { + error.set(t); + }

            + }
            + });
            + }
            +
            + service.awaitTermination(1L, TimeUnit.SECONDS);
            +
            + // ----- trigger the offset commit -----
            — End diff –

            We should be able to ignore offset commit triggering in this test

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190111239 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java — @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher<String> fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + int fetchTasks = 5; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }); + + for (int i = 0; i < fetchTasks; i++) { + service.submit(new Thread("add partitions " + i) { + @Override + public void run() { + try { + List<KafkaTopicPartition> newPartitions = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + newPartitions.add(testPartition); + } + fetcher.addDiscoveredPartitions(newPartitions); + latch.countDown(); + for (int i = 0; i < 100; i++) { + fetcher.addDiscoveredPartitions(newPartitions); + Thread.sleep(1L); + } + } catch (Throwable t) { + error.set(t); + } + } + }); + } + + service.awaitTermination(1L, TimeUnit.SECONDS); + + // ----- trigger the offset commit ----- — End diff – We should be able to ignore offset commit triggering in this test
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r190114844

            — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
            @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
            assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
            }

            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            + // test data
            + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
            +
            + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
            + testCommitData.put(testPartition, 11L);
            +
            + // ----- create the test fetcher -----
            +
            + @SuppressWarnings("unchecked")
            + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
            + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
            + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
            +
            + final TestFetcher<String> fetcher = new TestFetcher<>(
            + sourceContext,
            + partitionsWithInitialOffsets,
            + null, /* periodic assigner */
            + null, /* punctuated assigner */
            + new TestProcessingTimeService(),
            + 10);
            +
            + // ----- run the fetcher -----
            +
            + final AtomicReference<Throwable> error = new AtomicReference<>();
            + int fetchTasks = 5;
            + final CountDownLatch latch = new CountDownLatch(fetchTasks);
            + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
            +
            + service.submit(new Thread("fetcher runner") {
            + @Override
            + public void run() {
            + try {
            + latch.await();
            + fetcher.runFetchLoop();
            — End diff –

            The sequence here seems a bit odd to me.

            I think we should be testing this as follows:
            1. Run the fetch loop, and let it be blocked on record emitting (which then should let it be blocked mid-iteration)
            2. Add a discovered partition; this should not throw an exception.

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190114844 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java — @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher<String> fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + int fetchTasks = 5; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); — End diff – The sequence here seems a bit odd to me. I think we should be testing this as follows: 1. Run the fetch loop, and let it be blocked on record emitting (which then should let it be blocked mid-iteration) 2. Add a discovered partition; this should not throw an exception.
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r190112933

            — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
            @@ -507,7 +507,11 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {

            • List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>();
              + /**
              + * CopyOnWrite as adding discovered partitions could happen in parallel
              + * with different threads iterating by {@link AbstractFetcher#subscribedPartitionStates}

              results
              + */

                • End diff –

            I think we usually don't have Javadoc blocks within methods. A regular comment with `//` would do.

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190112933 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -507,7 +507,11 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>(); + /** + * CopyOnWrite as adding discovered partitions could happen in parallel + * with different threads iterating by {@link AbstractFetcher#subscribedPartitionStates} results + */ End diff – I think we usually don't have Javadoc blocks within methods. A regular comment with `//` would do.
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r190120209

            — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
            @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
            assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
            }

            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            + // test data
            + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
            +
            + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
            + testCommitData.put(testPartition, 11L);
            +
            + // ----- create the test fetcher -----
            +
            + @SuppressWarnings("unchecked")
            + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
            + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
            + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
            +
            + final TestFetcher<String> fetcher = new TestFetcher<>(
            + sourceContext,
            + partitionsWithInitialOffsets,
            + null, /* periodic assigner */
            + null, /* punctuated assigner */
            + new TestProcessingTimeService(),
            + 10);
            +
            + // ----- run the fetcher -----
            +
            + final AtomicReference<Throwable> error = new AtomicReference<>();
            + int fetchTasks = 5;
            + final CountDownLatch latch = new CountDownLatch(fetchTasks);
            + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
            +
            + service.submit(new Thread("fetcher runner") {
            + @Override
            + public void run() {
            + try {
            + latch.await();
            + fetcher.runFetchLoop();
            — End diff –

            The final `checkedThread.sync()` would always fail with the `ConcurrentModificationException` if the test is designed like this.

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190120209 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java — @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher<String> fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + int fetchTasks = 5; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); — End diff – The final `checkedThread.sync()` would always fail with the `ConcurrentModificationException` if the test is designed like this.
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r190111529

            — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
            @@ -416,9 +520,16 @@ protected TestFetcher(
            false);
            }

            + /**
            + * Emulation of partition's iteration which is required for
            + *

            {@link AbstractFetcherTest#testConcurrentPartitionsDiscoveryAndLoopFetching}

            .
            + * @throws Exception
            + */
            @Override
            public void runFetchLoop() throws Exception {

            • throw new UnsupportedOperationException();
              + for (KafkaTopicPartitionState ignored: subscribedPartitionStates()) {
              + Thread.sleep(10L);
                • End diff –

            This would only let the test fail "occasionally", right?
            I would like this to be changed, so that we always have the test failing without the copy on write fix.
            We could do this by having a dummy source context that blocks on record emit.

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190111529 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java — @@ -416,9 +520,16 @@ protected TestFetcher( false); } + /** + * Emulation of partition's iteration which is required for + * {@link AbstractFetcherTest#testConcurrentPartitionsDiscoveryAndLoopFetching} . + * @throws Exception + */ @Override public void runFetchLoop() throws Exception { throw new UnsupportedOperationException(); + for (KafkaTopicPartitionState ignored: subscribedPartitionStates()) { + Thread.sleep(10L); End diff – This would only let the test fail "occasionally", right? I would like this to be changed, so that we always have the test failing without the copy on write fix. We could do this by having a dummy source context that blocks on record emit.
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6040#discussion_r190120134

            — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java —
            @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
            assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
            }

            + @Test
            + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
            + // test data
            + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
            +
            + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
            + testCommitData.put(testPartition, 11L);
            +
            + // ----- create the test fetcher -----
            +
            + @SuppressWarnings("unchecked")
            + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class);
            + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
            + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
            +
            + final TestFetcher<String> fetcher = new TestFetcher<>(
            + sourceContext,
            + partitionsWithInitialOffsets,
            + null, /* periodic assigner */
            + null, /* punctuated assigner */
            + new TestProcessingTimeService(),
            + 10);
            +
            + // ----- run the fetcher -----
            +
            + final AtomicReference<Throwable> error = new AtomicReference<>();
            + int fetchTasks = 5;
            + final CountDownLatch latch = new CountDownLatch(fetchTasks);
            + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1);
            +
            + service.submit(new Thread("fetcher runner") {
            + @Override
            + public void run() {
            + try {
            + latch.await();
            + fetcher.runFetchLoop();
            — End diff –

            So, IMO, the test should look something like this:

            ```
            final OneShotLatch fetchLoopWaitLatch = new OneShotLatch();
            final OneShotLatch stateIterationBlockLatch = new OneShotLatch();

            final TestFetcher<String> fetcher = new TestFetcher<>(
            sourceContext,
            partitionsWithInitialOffsets,
            null, /* periodic assigner */
            null, /* punctuated assigner */
            new TestProcessingTimeService(),
            10,
            fetchLoopWaitLatch,
            stateIterationBlockLatch);

            // ----- run the fetcher -----

            final CheckedThread checkedThread = new CheckedThread() {
            @Override
            public void go() throws Exception

            { fetcher.runFetchLoop(); }

            };
            checkedThread.start();

            // wait until state iteration begins before adding discovered partitions
            fetchLoopWaitLatch.await();
            fetcher.addDiscoveredPartitions(Collections.singletonList(testPartition));

            stateIterationBlockLatch.trigger();
            checkedThread.sync();
            ```

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190120134 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java — @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher<String> fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + int fetchTasks = 5; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); — End diff – So, IMO, the test should look something like this: ``` final OneShotLatch fetchLoopWaitLatch = new OneShotLatch(); final OneShotLatch stateIterationBlockLatch = new OneShotLatch(); final TestFetcher<String> fetcher = new TestFetcher<>( sourceContext, partitionsWithInitialOffsets, null, /* periodic assigner */ null, /* punctuated assigner */ new TestProcessingTimeService(), 10, fetchLoopWaitLatch, stateIterationBlockLatch); // ----- run the fetcher ----- final CheckedThread checkedThread = new CheckedThread() { @Override public void go() throws Exception { fetcher.runFetchLoop(); } }; checkedThread.start(); // wait until state iteration begins before adding discovered partitions fetchLoopWaitLatch.await(); fetcher.addDiscoveredPartitions(Collections.singletonList(testPartition)); stateIterationBlockLatch.trigger(); checkedThread.sync(); ```
            githubbot ASF GitHub Bot added a comment -

            Github user asfgit closed the pull request at:

            https://github.com/apache/flink/pull/6040

            githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6040

            Merged. Fixed via,

            1.6.0 - 049994274c9d4fc07925a7639e4044506b090d10
            1.5.1 - bc4a402d09304a21c82299e368442c8a6e4ae427
            1.4.3 - 61c44d902d081ad5bf0e1654f62f70567d25fde8

            tzulitai Tzu-Li (Gordon) Tai added a comment - Merged. Fixed via, 1.6.0 - 049994274c9d4fc07925a7639e4044506b090d10 1.5.1 - bc4a402d09304a21c82299e368442c8a6e4ae427 1.4.3 - 61c44d902d081ad5bf0e1654f62f70567d25fde8
            githubbot ASF GitHub Bot added a comment -

            Github user snuyanzin commented on the issue:

            https://github.com/apache/flink/pull/6040

            @tzulitai thank you for your review and comments
            based on your comments I have a question. Could you please clarify it?

            You mentioned Flink's `OneShotLatch ` and `CheckedThread ` at the same time in some Kafka connector's tests used `AtomicReference`, `Thread` and etc. (I used one of them as an example while writing my version of the test). Just to be on the sage am I right that `OneShotLatch ` and `CheckedThread ` in tests are more preferable or are there some rules/limitations/whatever?

            githubbot ASF GitHub Bot added a comment - Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6040 @tzulitai thank you for your review and comments based on your comments I have a question. Could you please clarify it? You mentioned Flink's `OneShotLatch ` and `CheckedThread ` at the same time in some Kafka connector's tests used `AtomicReference`, `Thread` and etc. (I used one of them as an example while writing my version of the test). Just to be on the sage am I right that `OneShotLatch ` and `CheckedThread ` in tests are more preferable or are there some rules/limitations/whatever?
            githubbot ASF GitHub Bot added a comment -

            Github user tzulitai commented on the issue:

            https://github.com/apache/flink/pull/6040

            Using `CheckedThread` is more preferable, as it simplifies some of the test code.
            But yes, the utility was introduced at a later point in time in Flink, so some parts of the test code might still be using `Thread`s and `AtomicReference`s.

            githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6040 Using `CheckedThread` is more preferable, as it simplifies some of the test code. But yes, the utility was introduced at a later point in time in Flink, so some parts of the test code might still be using `Thread`s and `AtomicReference`s.
            pnowojski Piotr Nowojski added a comment -

            Because of cancelled 1.5.0 RC5 this fix will make it to 1.5.0 RC6. FYI tzulitai you might want to run some additional tests for 1.5.0 RC6 to make sure that everything works as expected, because this bug fix was not included in normal release testing, and a lot of voters will probably just carry over their votes to new RC without testing RC6.

            pnowojski Piotr Nowojski added a comment - Because of cancelled 1.5.0 RC5 this fix will make it to 1.5.0 RC6. FYI  tzulitai  you might want to run some additional tests for 1.5.0 RC6 to make sure that everything works as expected, because this bug fix was not included in normal release testing, and a lot of voters will probably just carry over their votes to new RC without testing RC6.

            pnowojski thanks for the reminder. Yes, I will test this with RC6.

            tzulitai Tzu-Li (Gordon) Tai added a comment - pnowojski thanks for the reminder. Yes, I will test this with RC6.

            People

              Sergey Nuyanzin Sergey Nuyanzin
              vishalsantv Vishal Santoshi
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: