Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4702

Kafka consumer must commit offsets asynchronously

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.1.2
    • Fix Version/s: 1.2.0, 1.1.3
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      The offset commit calls to Kafka may occasionally take very long.
      In that case, the notifyCheckpointComplete() method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints.

      Kafka 0.9+ have methods to commit asynchronously.
      We should use those and make sure no more than one commit is concurrently in progress, to that commit requests do not pile up.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/2559

          FLINK-4702 [kafka connector] Commit offets to Kafka asynchronously

          The offset commit calls to Kafka may occasionally take very long. In that case, the notifyCheckpointComplete() method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints.

          This pull request changes the offset committing to use Kafka's `commitAsync()` method.
          It also makes sure that no more than one commit is concurrently in progress, to that commit requests do not pile up.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink kafka_commit_async

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2559.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2559


          commit eafba8600863c18e09397366485bcfc6ff44960f
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-09-27T18:59:35Z

          FLINK-4702 [kafka connector] Commit offets to Kafka asynchronously


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2559 FLINK-4702 [kafka connector] Commit offets to Kafka asynchronously The offset commit calls to Kafka may occasionally take very long. In that case, the notifyCheckpointComplete() method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints. This pull request changes the offset committing to use Kafka's `commitAsync()` method. It also makes sure that no more than one commit is concurrently in progress, to that commit requests do not pile up. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink kafka_commit_async Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2559.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2559 commit eafba8600863c18e09397366485bcfc6ff44960f Author: Stephan Ewen <sewen@apache.org> Date: 2016-09-27T18:59:35Z FLINK-4702 [kafka connector] Commit offets to Kafka asynchronously
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2559

          @robert and @tzulitai What is your take on this?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 @robert and @tzulitai What is your take on this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2559

          Just had a look at the API of `commitAsync`, and it seems like the committed offsets back to Kafka through this API (likewise for `commitSync`) need to be `lastProcessedMessageOffset + 1` (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)(https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback))).

          This mainly effects that when starting from group offsets in Kafka, `FlinkKafkaConsumer09` currently starts from the wrong offset. There's a separate JIRA for this bug: FLINK-4618(https://issues.apache.org/jira/browse/FLINK-4618).

          Another contributor had already picked up FLINK-4618, so I'd say it's ok to leave this PR as it is. I'll help check on FLINK-4618 progress and make sure it gets merged after this PR.

          Minus the above, this looks good to me. +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 Just had a look at the API of `commitAsync`, and it seems like the committed offsets back to Kafka through this API (likewise for `commitSync`) need to be `lastProcessedMessageOffset + 1` ( https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback) ( https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback ))). This mainly effects that when starting from group offsets in Kafka, `FlinkKafkaConsumer09` currently starts from the wrong offset. There's a separate JIRA for this bug: FLINK-4618 ( https://issues.apache.org/jira/browse/FLINK-4618 ). Another contributor had already picked up FLINK-4618 , so I'd say it's ok to leave this PR as it is. I'll help check on FLINK-4618 progress and make sure it gets merged after this PR. Minus the above, this looks good to me. +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2559#discussion_r80903481

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -86,6 +90,9 @@
          /** Flag to mark the main work loop as alive */
          private volatile boolean running = true;

          + /** Flag indicating whether a commit of offsets to Kafka it currently happening */
          — End diff –

          nit: it --> "is"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80903481 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -86,6 +90,9 @@ /** Flag to mark the main work loop as alive */ private volatile boolean running = true; + /** Flag indicating whether a commit of offsets to Kafka it currently happening */ — End diff – nit: it --> "is"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2559

          Btw, just curious, does 0.8 Kafka connector have the same issue with sync committing? I haven't looked into the code for this, but just wondering if we need a ticket for 0.8 too.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 Btw, just curious, does 0.8 Kafka connector have the same issue with sync committing? I haven't looked into the code for this, but just wondering if we need a ticket for 0.8 too.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2559

          @StephanEwen I think you've tagged the wrong Github ID for Robert

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 @StephanEwen I think you've tagged the wrong Github ID for Robert
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2559#discussion_r80906814

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -301,4 +316,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
          }
          return result;
          }
          +
          + private class CommitCallback implements OffsetCommitCallback {
          +
          + @Override
          + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          + commitInProgress = false;
          +
          + if (exception != null) {
          + LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints", exception);
          — End diff –

          The exception message isn't included in the log warning.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80906814 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -301,4 +316,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) } return result; } + + private class CommitCallback implements OffsetCommitCallback { + + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + commitInProgress = false; + + if (exception != null) { + LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints", exception); — End diff – The exception message isn't included in the log warning.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2559#discussion_r80907326

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)

          if (this.consumer != null) {
          synchronized (consumerLock) {

          • this.consumer.commitSync(offsetsToCommit);
            + if (!commitInProgress) { + commitInProgress = true; + this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback); + }

            + else {
            + LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " +

              • End diff –

          If the user sets a relatively short checkpoint interval, will this be flooding log?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80907326 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) if (this.consumer != null) { synchronized (consumerLock) { this.consumer.commitSync(offsetsToCommit); + if (!commitInProgress) { + commitInProgress = true; + this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback); + } + else { + LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " + End diff – If the user sets a relatively short checkpoint interval, will this be flooding log?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2559

          Thanks @tzulitai for looking at this. I will leave the offset then as it is (fixed via followup) and

          The Kafka 0.8 connector needs a similar change. This here is encountered by a user, so I wanted to get the 0.9 fix in faster. Will do a follow-up for Kafka 0.8. Will also correct the issue tag

          I have no good idea how to test this, though, so any thoughts there are welcome!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 Thanks @tzulitai for looking at this. I will leave the offset then as it is (fixed via followup) and The Kafka 0.8 connector needs a similar change. This here is encountered by a user, so I wanted to get the 0.9 fix in faster. Will do a follow-up for Kafka 0.8. Will also correct the issue tag I have no good idea how to test this, though, so any thoughts there are welcome!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2559#discussion_r80909904

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)

          if (this.consumer != null) {
          synchronized (consumerLock) {

          • this.consumer.commitSync(offsetsToCommit);
            + if (!commitInProgress) { + commitInProgress = true; + this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback); + }

            + else {
            + LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " +

              • End diff –

          Possibly yes. But on the other hand, this should be pretty visible if it happens.
          I would expect that with proper options to participate in group checkpoint committing, most Flink jobs run without committing to Kafka/ZooKeeper.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80909904 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) if (this.consumer != null) { synchronized (consumerLock) { this.consumer.commitSync(offsetsToCommit); + if (!commitInProgress) { + commitInProgress = true; + this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback); + } + else { + LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " + End diff – Possibly yes. But on the other hand, this should be pretty visible if it happens. I would expect that with proper options to participate in group checkpoint committing, most Flink jobs run without committing to Kafka/ZooKeeper.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2559

          @StephanEwen
          On a second look, I think the `commitSpecificOffsetsToKafka` method was designed to commit synchronously in the first place. `AbstractFetcher` holds a Map of all current pending offsets for committing by checkpointID, and on every `notifyCheckpointComplete` the offsets are removed from the Map before `commitSpecificOffsetsToKafka` is called.

          So, for async committing, I think we need to remove cleaning up the offsets in `AbstractFetcher#notifyCheckpointComplete()` and instead clean them up in a new separate callback handle method in `AbstractFetcher`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 @StephanEwen On a second look, I think the `commitSpecificOffsetsToKafka` method was designed to commit synchronously in the first place. `AbstractFetcher` holds a Map of all current pending offsets for committing by checkpointID, and on every `notifyCheckpointComplete` the offsets are removed from the Map before `commitSpecificOffsetsToKafka` is called. So, for async committing, I think we need to remove cleaning up the offsets in `AbstractFetcher#notifyCheckpointComplete()` and instead clean them up in a new separate callback handle method in `AbstractFetcher`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2559#discussion_r80914495

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)

          if (this.consumer != null) {
          synchronized (consumerLock) {

          • this.consumer.commitSync(offsetsToCommit);
            + if (!commitInProgress) { + commitInProgress = true; + this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback); + }

            + else {
            + LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " +

              • End diff –

          I agree, makes sense.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80914495 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) if (this.consumer != null) { synchronized (consumerLock) { this.consumer.commitSync(offsetsToCommit); + if (!commitInProgress) { + commitInProgress = true; + this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback); + } + else { + LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " + End diff – I agree, makes sense.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2559

          Seems like currently only the 0.8 Kafka connector have tests related to offset committing (in `Kafka08ITCase`).

          My two cents for testing this for now is that a IT test for correct offset committing back to Kafka in the 0.9 connector is sufficient (can take a look at `Kafka08ITCase#testOffsetInZookeeper`, but replacing `ZookeeperOffsetHandler` with the new `KafkaConsumer` methods).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 Seems like currently only the 0.8 Kafka connector have tests related to offset committing (in `Kafka08ITCase`). My two cents for testing this for now is that a IT test for correct offset committing back to Kafka in the 0.9 connector is sufficient (can take a look at `Kafka08ITCase#testOffsetInZookeeper`, but replacing `ZookeeperOffsetHandler` with the new `KafkaConsumer` methods).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2559#discussion_r81089161

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -301,4 +316,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
          }
          return result;
          }
          +
          + private class CommitCallback implements OffsetCommitCallback {
          +
          + @Override
          + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          + commitInProgress = false;
          +
          + if (exception != null) {
          + LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints", exception);
          — End diff –

          Oops, this is actually correct, sorry.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r81089161 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -301,4 +316,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) } return result; } + + private class CommitCallback implements OffsetCommitCallback { + + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + commitInProgress = false; + + if (exception != null) { + LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints", exception); — End diff – Oops, this is actually correct, sorry.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2559

          @tzulitai Thanks for thorough review!

          I don't understand the problem why the `commitSpecificOffsetsToKafka` method is designed to commit synchronously. The `FlinkKafkaConsumerBase` has the pending checkpoints (I think that is what you refer to). It removes the HashMap of "offsets to commit" from the `pendingCheckpoints` Map synchronously, before even calling the fetcher to commit.
          After that, it looks to me like it does not make a difference how that Map "offsets to commit" is used (sync or async)...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 @tzulitai Thanks for thorough review! I don't understand the problem why the `commitSpecificOffsetsToKafka` method is designed to commit synchronously. The `FlinkKafkaConsumerBase` has the pending checkpoints (I think that is what you refer to). It removes the HashMap of "offsets to commit" from the `pendingCheckpoints` Map synchronously, before even calling the fetcher to commit. After that, it looks to me like it does not make a difference how that Map "offsets to commit" is used (sync or async)...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2559

          Actually, just discovered that the problem is different all together.

          While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over.
          During that time, neither a "commitSync" nor "commitAsync" call can be fired off. The `notifyCheckpointComplete` method hence blocks until the poll timeout is over and the lock is released.

          We can fix this by making sure that the consumer is "woken up" to release the lock, and by making sure the lock acquisition is fair, so the committer will get it next.

          For the sake of releasing the lock fast in the committer method, it should still be an asynchronous commit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 Actually, just discovered that the problem is different all together. While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over. During that time, neither a "commitSync" nor "commitAsync" call can be fired off. The `notifyCheckpointComplete` method hence blocks until the poll timeout is over and the lock is released. We can fix this by making sure that the consumer is "woken up" to release the lock, and by making sure the lock acquisition is fair, so the committer will get it next. For the sake of releasing the lock fast in the committer method, it should still be an asynchronous commit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/2574

          FLINK-4702 [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls

          This fix is quite critical!

          While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over.

          During that time, no offset commit can make progress, because it needs the consumer lock. The `notifyCheckpointComplete()` method of the Kafka Consumer hence blocks until the poll timeout is over and the lock is released. For low-throughput Kafka Topics, this can cause wildly long checkpoint delays.

          This changes `notifyCheckpointComplete()` to only "schedule" offsets to be committed, while the main fetcher thread actually kick off the asynchronous offset commits. That way, there is no interference between the `notifyCheckpointComplete()` method (which is executed under checkpoint lock) and the consumer lock.

          In fact, the only KafkaConsumer method accessed concurrently to the main fetcher thread is `wakeup()` which is actually thread-safe (where the rest of the KafkaConsumer is not). The consumer lock was hence completely removed.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink kafka_09_fix

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2574.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2574


          commit 0846fd907db7d52d7e5fb7d704c5e1c13462e331
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-09-29T16:09:51Z

          FLINK-4702 [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls

          Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' may take
          very long. This is mostly relevant for low-throughput Kafka topics.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2574 FLINK-4702 [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls This fix is quite critical! While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over. During that time, no offset commit can make progress, because it needs the consumer lock. The `notifyCheckpointComplete()` method of the Kafka Consumer hence blocks until the poll timeout is over and the lock is released. For low-throughput Kafka Topics, this can cause wildly long checkpoint delays. This changes `notifyCheckpointComplete()` to only "schedule" offsets to be committed, while the main fetcher thread actually kick off the asynchronous offset commits. That way, there is no interference between the `notifyCheckpointComplete()` method (which is executed under checkpoint lock) and the consumer lock. In fact, the only KafkaConsumer method accessed concurrently to the main fetcher thread is `wakeup()` which is actually thread-safe (where the rest of the KafkaConsumer is not). The consumer lock was hence completely removed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink kafka_09_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2574.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2574 commit 0846fd907db7d52d7e5fb7d704c5e1c13462e331 Author: Stephan Ewen <sewen@apache.org> Date: 2016-09-29T16:09:51Z FLINK-4702 [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' may take very long. This is mostly relevant for low-throughput Kafka topics.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen closed the pull request at:

          https://github.com/apache/flink/pull/2559

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2559
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2559

          Closing this for #2574

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 Closing this for #2574
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2574

          @tzulitai @rmetzger We need to make sure that the Kafka 0.10 code picks up this change and the test case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2574 @tzulitai @rmetzger We need to make sure that the Kafka 0.10 code picks up this change and the test case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2574#discussion_r81277067

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java —
          @@ -0,0 +1,300 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.core.testutils.MultiShotLatch;
          +import org.apache.flink.core.testutils.OneShotLatch;
          +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
          +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
          +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
          +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
          +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
          +
          +import org.apache.kafka.clients.consumer.ConsumerRecords;
          +import org.apache.kafka.clients.consumer.KafkaConsumer;
          +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
          +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
          +import org.apache.kafka.common.TopicPartition;
          +
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +
          +import org.mockito.Mockito;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Map.Entry;
          +import java.util.Properties;
          +import java.util.concurrent.BlockingQueue;
          +import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertFalse;
          +
          +import static org.mockito.Mockito.any;
          +import static org.mockito.Mockito.anyLong;
          +import static org.powermock.api.mockito.PowerMockito.doAnswer;
          +import static org.powermock.api.mockito.PowerMockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +import static org.powermock.api.mockito.PowerMockito.whenNew;
          +
          +/**
          + * Unit tests for the

          {@link Kafka09Fetcher}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(Kafka09Fetcher.class)
          +public class Kafka09FetcherTest {
          +
          + @Test
          + public void testCommitDoesNotBlock() throws Exception {
          +
          + // test data
          + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
          + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
          + testCommitData.put(testPartition, 11L);
          +
          + // to synchronize when the consumer is in its blocking method
          + final OneShotLatch sync = new OneShotLatch();
          +
          + // ----- the mock consumer with blocking poll calls ----
          + final MultiShotLatch blockerLatch = new MultiShotLatch();
          +
          + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
          + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
          +
          + @Override
          + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException

          { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + }

          + });
          +
          + doAnswer(new Answer<Void>() {
          + @Override
          + public Void answer(InvocationOnMock invocation)

          { + blockerLatch.trigger(); + return null; + }

          + }).when(mockConsumer).wakeup();
          +
          + // make sure the fetcher creates the mock consumer
          + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
          +
          + // ----- create the test fetcher -----
          +
          + @SuppressWarnings("unchecked")
          + SourceContext<String> sourceContext = mock(SourceContext.class);
          + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
          + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
          + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
          +
          + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
          + sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
          +
          + // ----- run the fetcher -----
          +
          + final AtomicReference<Throwable> error = new AtomicReference<>();
          + final Thread fetcherRunner = new Thread("fetcher runner") {
          +
          + @Override
          + public void run() {
          + try

          { + fetcher.runFetchLoop(); + }

          catch (Throwable t)

          { + error.set(t); + }

          + }
          + };
          + fetcherRunner.start();
          +
          + // wait until the fetcher has reached the method of interest
          + sync.await();
          +
          + // ----- trigger the offset commit -----
          +
          + final AtomicReference<Throwable> commitError = new AtomicReference<>();
          + final Thread committer = new Thread("committer runner") {
          + @Override
          + public void run() {
          + try

          { + fetcher.commitSpecificOffsetsToKafka(testCommitData); + }

          catch (Throwable t)

          { + commitError.set(t); + }

          + }
          + };
          + committer.start();
          +
          + // ----- ensure that the committer finishes in time -----
          + committer.join(30000);
          + assertFalse("The committer did not finish in time", committer.isAlive());
          +
          + // ----- test done, wait till the fetcher is done for a clean shutdown -----
          + fetcher.cancel();
          + fetcherRunner.join();
          +
          + // check that there were no errors in the fetcher
          + final Throwable caughtError = error.get();
          + if (caughtError != null)

          { + throw new Exception("Exception in the fetcher", caughtError); + }

          — End diff –

          Might as well also check `commitError`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81277067 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java — @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = mock(SourceContext.class); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // wait until the fetcher has reached the method of interest + sync.await(); + + // ----- trigger the offset commit ----- + + final AtomicReference<Throwable> commitError = new AtomicReference<>(); + final Thread committer = new Thread("committer runner") { + @Override + public void run() { + try { + fetcher.commitSpecificOffsetsToKafka(testCommitData); + } catch (Throwable t) { + commitError.set(t); + } + } + }; + committer.start(); + + // ----- ensure that the committer finishes in time ----- + committer.join(30000); + assertFalse("The committer did not finish in time", committer.isAlive()); + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable caughtError = error.get(); + if (caughtError != null) { + throw new Exception("Exception in the fetcher", caughtError); + } — End diff – Might as well also check `commitError`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2574#discussion_r81282102

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java —
          @@ -0,0 +1,300 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.core.testutils.MultiShotLatch;
          +import org.apache.flink.core.testutils.OneShotLatch;
          +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
          +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
          +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
          +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
          +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
          +
          +import org.apache.kafka.clients.consumer.ConsumerRecords;
          +import org.apache.kafka.clients.consumer.KafkaConsumer;
          +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
          +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
          +import org.apache.kafka.common.TopicPartition;
          +
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +
          +import org.mockito.Mockito;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Map.Entry;
          +import java.util.Properties;
          +import java.util.concurrent.BlockingQueue;
          +import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertFalse;
          +
          +import static org.mockito.Mockito.any;
          +import static org.mockito.Mockito.anyLong;
          +import static org.powermock.api.mockito.PowerMockito.doAnswer;
          +import static org.powermock.api.mockito.PowerMockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +import static org.powermock.api.mockito.PowerMockito.whenNew;
          +
          +/**
          + * Unit tests for the

          {@link Kafka09Fetcher}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(Kafka09Fetcher.class)
          +public class Kafka09FetcherTest {
          +
          + @Test
          + public void testCommitDoesNotBlock() throws Exception {
          +
          + // test data
          + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
          + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
          + testCommitData.put(testPartition, 11L);
          +
          + // to synchronize when the consumer is in its blocking method
          + final OneShotLatch sync = new OneShotLatch();
          +
          + // ----- the mock consumer with blocking poll calls ----
          + final MultiShotLatch blockerLatch = new MultiShotLatch();
          +
          + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
          + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
          +
          + @Override
          + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
          + sync.trigger();
          + blockerLatch.await();
          — End diff –

          We are not ensuring that `blockLatch` is returned here, correct? Like my comment above, perhaps we should check that to to ensure that `wakeup` is called in `commitSpecificOffsetsToKafka`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81282102 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java — @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); — End diff – We are not ensuring that `blockLatch` is returned here, correct? Like my comment above, perhaps we should check that to to ensure that `wakeup` is called in `commitSpecificOffsetsToKafka`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2574#discussion_r81277361

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java —
          @@ -0,0 +1,300 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.core.testutils.MultiShotLatch;
          +import org.apache.flink.core.testutils.OneShotLatch;
          +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
          +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
          +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
          +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
          +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
          +
          +import org.apache.kafka.clients.consumer.ConsumerRecords;
          +import org.apache.kafka.clients.consumer.KafkaConsumer;
          +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
          +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
          +import org.apache.kafka.common.TopicPartition;
          +
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +
          +import org.mockito.Mockito;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Map.Entry;
          +import java.util.Properties;
          +import java.util.concurrent.BlockingQueue;
          +import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertFalse;
          +
          +import static org.mockito.Mockito.any;
          +import static org.mockito.Mockito.anyLong;
          +import static org.powermock.api.mockito.PowerMockito.doAnswer;
          +import static org.powermock.api.mockito.PowerMockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +import static org.powermock.api.mockito.PowerMockito.whenNew;
          +
          +/**
          + * Unit tests for the

          {@link Kafka09Fetcher}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(Kafka09Fetcher.class)
          +public class Kafka09FetcherTest {
          — End diff –

          Should we also add a test to make sure that `KafkaConsumer` is immediately called `wakeup()` in `commitSpecificOffsetsToKafka`? Otherwise we are not ensuring the behaviour of "committing offsets back to Kafka on checkpoints"

          Perhaps this can be integrated into `testCommitDoesNotBlock()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81277361 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java — @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { — End diff – Should we also add a test to make sure that `KafkaConsumer` is immediately called `wakeup()` in `commitSpecificOffsetsToKafka`? Otherwise we are not ensuring the behaviour of "committing offsets back to Kafka on checkpoints" Perhaps this can be integrated into `testCommitDoesNotBlock()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2574#discussion_r81275213

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
          }
          }

          • if (this.consumer != null) {
          • synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - }

            + if (commitInProgress)

            { + LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Some checkpoints may be subsumed before committed. " + + "This does not compromise Flink's checkpoint integrity."); + }
              • End diff –

          Will it make sense to simply move this warning to the `if (toCommit != null && !commitInProgress)` block in the main thread? That's where `commitInProgress` will actually determine whether or not the current offsets to commit will be dropped. Also, the actual committing should happen right after anyways because of `consumer.wakeup()`, so I don't see the purpose of an eager warning here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81275213 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) } } if (this.consumer != null) { synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - } + if (commitInProgress) { + LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Some checkpoints may be subsumed before committed. " + + "This does not compromise Flink's checkpoint integrity."); + } End diff – Will it make sense to simply move this warning to the `if (toCommit != null && !commitInProgress)` block in the main thread? That's where `commitInProgress` will actually determine whether or not the current offsets to commit will be dropped. Also, the actual committing should happen right after anyways because of `consumer.wakeup()`, so I don't see the purpose of an eager warning here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2574#discussion_r81302867

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java —
          @@ -0,0 +1,300 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.core.testutils.MultiShotLatch;
          +import org.apache.flink.core.testutils.OneShotLatch;
          +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
          +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
          +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
          +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
          +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
          +
          +import org.apache.kafka.clients.consumer.ConsumerRecords;
          +import org.apache.kafka.clients.consumer.KafkaConsumer;
          +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
          +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
          +import org.apache.kafka.common.TopicPartition;
          +
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +
          +import org.mockito.Mockito;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Map.Entry;
          +import java.util.Properties;
          +import java.util.concurrent.BlockingQueue;
          +import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertFalse;
          +
          +import static org.mockito.Mockito.any;
          +import static org.mockito.Mockito.anyLong;
          +import static org.powermock.api.mockito.PowerMockito.doAnswer;
          +import static org.powermock.api.mockito.PowerMockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +import static org.powermock.api.mockito.PowerMockito.whenNew;
          +
          +/**
          + * Unit tests for the

          {@link Kafka09Fetcher}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(Kafka09Fetcher.class)
          +public class Kafka09FetcherTest {
          +
          + @Test
          + public void testCommitDoesNotBlock() throws Exception {
          +
          + // test data
          + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
          + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
          + testCommitData.put(testPartition, 11L);
          +
          + // to synchronize when the consumer is in its blocking method
          + final OneShotLatch sync = new OneShotLatch();
          +
          + // ----- the mock consumer with blocking poll calls ----
          + final MultiShotLatch blockerLatch = new MultiShotLatch();
          +
          + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
          + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
          +
          + @Override
          + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException

          { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + }

          + });
          +
          + doAnswer(new Answer<Void>() {
          + @Override
          + public Void answer(InvocationOnMock invocation)

          { + blockerLatch.trigger(); + return null; + }

          + }).when(mockConsumer).wakeup();
          +
          + // make sure the fetcher creates the mock consumer
          + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
          +
          + // ----- create the test fetcher -----
          +
          + @SuppressWarnings("unchecked")
          + SourceContext<String> sourceContext = mock(SourceContext.class);
          + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
          + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
          + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
          +
          + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
          + sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
          +
          + // ----- run the fetcher -----
          +
          + final AtomicReference<Throwable> error = new AtomicReference<>();
          + final Thread fetcherRunner = new Thread("fetcher runner") {
          +
          + @Override
          + public void run() {
          + try

          { + fetcher.runFetchLoop(); + }

          catch (Throwable t)

          { + error.set(t); + }

          + }
          + };
          + fetcherRunner.start();
          +
          + // wait until the fetcher has reached the method of interest
          + sync.await();
          +
          + // ----- trigger the offset commit -----
          +
          + final AtomicReference<Throwable> commitError = new AtomicReference<>();
          + final Thread committer = new Thread("committer runner") {
          + @Override
          + public void run() {
          + try

          { + fetcher.commitSpecificOffsetsToKafka(testCommitData); + }

          catch (Throwable t)

          { + commitError.set(t); + }

          + }
          + };
          + committer.start();
          +
          + // ----- ensure that the committer finishes in time -----
          + committer.join(30000);
          + assertFalse("The committer did not finish in time", committer.isAlive());
          +
          + // ----- test done, wait till the fetcher is done for a clean shutdown -----
          + fetcher.cancel();
          + fetcherRunner.join();
          +
          + // check that there were no errors in the fetcher
          + final Throwable caughtError = error.get();
          + if (caughtError != null)

          { + throw new Exception("Exception in the fetcher", caughtError); + }

          — End diff –

          Yes, should and will do that!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81302867 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java — @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = mock(SourceContext.class); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // wait until the fetcher has reached the method of interest + sync.await(); + + // ----- trigger the offset commit ----- + + final AtomicReference<Throwable> commitError = new AtomicReference<>(); + final Thread committer = new Thread("committer runner") { + @Override + public void run() { + try { + fetcher.commitSpecificOffsetsToKafka(testCommitData); + } catch (Throwable t) { + commitError.set(t); + } + } + }; + committer.start(); + + // ----- ensure that the committer finishes in time ----- + committer.join(30000); + assertFalse("The committer did not finish in time", committer.isAlive()); + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable caughtError = error.get(); + if (caughtError != null) { + throw new Exception("Exception in the fetcher", caughtError); + } — End diff – Yes, should and will do that!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2574#discussion_r81303182

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
          }
          }

          • if (this.consumer != null) {
          • synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - }

            + if (commitInProgress)

            { + LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Some checkpoints may be subsumed before committed. " + + "This does not compromise Flink's checkpoint integrity."); + }
              • End diff –

          I would not want to put it into the main loop, because then the warning would come repeatedly, every time the poll happens.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81303182 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) } } if (this.consumer != null) { synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - } + if (commitInProgress) { + LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Some checkpoints may be subsumed before committed. " + + "This does not compromise Flink's checkpoint integrity."); + } End diff – I would not want to put it into the main loop, because then the warning would come repeatedly, every time the poll happens.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2574#discussion_r81303450

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
          }
          }

          • if (this.consumer != null) {
          • synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - }

            + if (commitInProgress)

            { + LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Some checkpoints may be subsumed before committed. " + + "This does not compromise Flink's checkpoint integrity."); + }
              • End diff –

          Also, in both cases, you cannot know whether it is dropped - it may still be that only one commit was delayed and the "toCommit" data will actually be picked up, but with a delay.

          I have an idea though where we can check that properly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81303450 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -283,10 +296,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) } } if (this.consumer != null) { synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - } + if (commitInProgress) { + LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Some checkpoints may be subsumed before committed. " + + "This does not compromise Flink's checkpoint integrity."); + } End diff – Also, in both cases, you cannot know whether it is dropped - it may still be that only one commit was delayed and the "toCommit" data will actually be picked up, but with a delay. I have an idea though where we can check that properly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2574#discussion_r81303617

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java —
          @@ -0,0 +1,300 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.core.testutils.MultiShotLatch;
          +import org.apache.flink.core.testutils.OneShotLatch;
          +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
          +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
          +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
          +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
          +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
          +
          +import org.apache.kafka.clients.consumer.ConsumerRecords;
          +import org.apache.kafka.clients.consumer.KafkaConsumer;
          +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
          +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
          +import org.apache.kafka.common.TopicPartition;
          +
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +
          +import org.mockito.Mockito;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Map.Entry;
          +import java.util.Properties;
          +import java.util.concurrent.BlockingQueue;
          +import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertFalse;
          +
          +import static org.mockito.Mockito.any;
          +import static org.mockito.Mockito.anyLong;
          +import static org.powermock.api.mockito.PowerMockito.doAnswer;
          +import static org.powermock.api.mockito.PowerMockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +import static org.powermock.api.mockito.PowerMockito.whenNew;
          +
          +/**
          + * Unit tests for the

          {@link Kafka09Fetcher}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(Kafka09Fetcher.class)
          +public class Kafka09FetcherTest {
          — End diff –

          It is already integrated in the test, actually. Since in the mock consumer, the `poll()` call blocks forever and only wakes up on `wakeup()`, the test fails is `wakeup()` is not called.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81303617 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java — @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { — End diff – It is already integrated in the test, actually. Since in the mock consumer, the `poll()` call blocks forever and only wakes up on `wakeup()`, the test fails is `wakeup()` is not called.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2574#discussion_r81303671

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java —
          @@ -0,0 +1,300 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka;
          +
          +import org.apache.flink.core.testutils.MultiShotLatch;
          +import org.apache.flink.core.testutils.OneShotLatch;
          +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
          +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
          +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
          +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
          +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
          +import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
          +
          +import org.apache.kafka.clients.consumer.ConsumerRecords;
          +import org.apache.kafka.clients.consumer.KafkaConsumer;
          +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
          +import org.apache.kafka.clients.consumer.OffsetCommitCallback;
          +import org.apache.kafka.common.TopicPartition;
          +
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +
          +import org.mockito.Mockito;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Map.Entry;
          +import java.util.Properties;
          +import java.util.concurrent.BlockingQueue;
          +import java.util.concurrent.LinkedBlockingQueue;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertFalse;
          +
          +import static org.mockito.Mockito.any;
          +import static org.mockito.Mockito.anyLong;
          +import static org.powermock.api.mockito.PowerMockito.doAnswer;
          +import static org.powermock.api.mockito.PowerMockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +import static org.powermock.api.mockito.PowerMockito.whenNew;
          +
          +/**
          + * Unit tests for the

          {@link Kafka09Fetcher}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(Kafka09Fetcher.class)
          +public class Kafka09FetcherTest {
          +
          + @Test
          + public void testCommitDoesNotBlock() throws Exception {
          +
          + // test data
          + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
          + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
          + testCommitData.put(testPartition, 11L);
          +
          + // to synchronize when the consumer is in its blocking method
          + final OneShotLatch sync = new OneShotLatch();
          +
          + // ----- the mock consumer with blocking poll calls ----
          + final MultiShotLatch blockerLatch = new MultiShotLatch();
          +
          + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
          + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
          +
          + @Override
          + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
          + sync.trigger();
          + blockerLatch.await();
          — End diff –

          See above.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2574#discussion_r81303671 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java — @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); — End diff – See above.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2574

          Thanks for the review. I think some points you mentioned are already addressed, actually.

          Will add the following:

          • `commitSpecificOffsets()` can determine precisely when a commit request is subsumed by a new one
          • Extend test to catch errors from committer thread.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2574 Thanks for the review. I think some points you mentioned are already addressed, actually. Will add the following: `commitSpecificOffsets()` can determine precisely when a commit request is subsumed by a new one Extend test to catch errors from committer thread.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen closed the pull request at:

          https://github.com/apache/flink/pull/2574

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2574
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2574

          Merged in 92f4539afc714f7dbd293c3ad677b3b5807c6911

          Addresses @tzulitai comments. Fixed the warning log message by using atomic swap when setting the next offsets to be committed. If it swaps for a non-null value, that value is skipped and subsumed by the newer offsets.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2574 Merged in 92f4539afc714f7dbd293c3ad677b3b5807c6911 Addresses @tzulitai comments. Fixed the warning log message by using atomic swap when setting the next offsets to be committed. If it swaps for a non-null value, that value is skipped and subsumed by the newer offsets.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in

          • 1.2.0 via 92f4539afc714f7dbd293c3ad677b3b5807c6911
          • 1.1.3 via 90d77594fffda1d8d15658d363c478ea6430514e
          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 1.2.0 via 92f4539afc714f7dbd293c3ad677b3b5807c6911 1.1.3 via 90d77594fffda1d8d15658d363c478ea6430514e

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development