Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5368

Let Kafka consumer show something when it fails to read one topic out of topic list

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      As a developer when reading data from many topics, I want Kafka consumer to show something if any topic is not available. The motivation is we read many topics as list at one time, and sometimes we fail to recognize that one or two topics' names have been changed or deprecated, and Flink Kafka connector doesn't show the error.

      My proposed change would be either to throw RuntimeException or to use LOG.error(topic + "doesn't have any partition") if partitionsForTopic is null at this function.
      https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java#L208

      Any suggestion is welcome.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user HungUnicorn opened a pull request:

          https://github.com/apache/flink/pull/3036

          FLINK-5368 Throw exception if kafka topic doesn't exist

          As a developer when reading data from many topics, I want Kafka consumer to show something if any topic is not available.

          The motivation is we read many topics as list at one time, and sometimes we fail to recognize that one or two topics' names have been changed or deprecated, and Flink Kafka connector didn't show the error.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/HungUnicorn/flink master

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3036.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3036


          commit ddf5e8dbb8c5a35ad5df3b11baf334d3b139795c
          Author: HungUnicorn <unicorn.banachi@gmail.com>
          Date: 2016-12-21T10:08:35Z

          FLINK-5368 Throw exception if kafka topic doesn't exist


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user HungUnicorn opened a pull request: https://github.com/apache/flink/pull/3036 FLINK-5368 Throw exception if kafka topic doesn't exist As a developer when reading data from many topics, I want Kafka consumer to show something if any topic is not available. The motivation is we read many topics as list at one time, and sometimes we fail to recognize that one or two topics' names have been changed or deprecated, and Flink Kafka connector didn't show the error. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HungUnicorn/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3036.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3036 commit ddf5e8dbb8c5a35ad5df3b11baf334d3b139795c Author: HungUnicorn <unicorn.banachi@gmail.com> Date: 2016-12-21T10:08:35Z FLINK-5368 Throw exception if kafka topic doesn't exist
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user DieBauer commented on the issue:

          https://github.com/apache/flink/pull/3036

          Looks like the buildjob ran out of memory:
          ```
          Running org.apache.flink.api.scala.ScalaShellITCase
          Running org.apache.flink.api.scala.ScalaShellLocalStartupITCase
          Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 16.355 sec - in org.apache.flink.api.scala.ScalaShellLocalStartupITCase
          java.lang.OutOfMemoryError: Java heap space
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user DieBauer commented on the issue: https://github.com/apache/flink/pull/3036 Looks like the buildjob ran out of memory: ``` Running org.apache.flink.api.scala.ScalaShellITCase Running org.apache.flink.api.scala.ScalaShellLocalStartupITCase Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 16.355 sec - in org.apache.flink.api.scala.ScalaShellLocalStartupITCase java.lang.OutOfMemoryError: Java heap space ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3036#discussion_r93814607

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java —
          @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d
          if (partitionsForTopic != null)

          { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); }

          + else{
          + throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topic);
          — End diff –

          I think throwing an exception here is not a good idea.
          If you have 10 topics, and only one of them doesn't have any partitions, the consumer won't start anymore.
          Also the exception message is not accurate. It says Flink can not retrieve ANY partitions for the requested topicS. However, its just the partitions of one topic.

          I suggest to only log a INFO-level message in that case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3036#discussion_r93814607 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java — @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } + else{ + throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topic); — End diff – I think throwing an exception here is not a good idea. If you have 10 topics, and only one of them doesn't have any partitions, the consumer won't start anymore. Also the exception message is not accurate. It says Flink can not retrieve ANY partitions for the requested topicS. However, its just the partitions of one topic. I suggest to only log a INFO-level message in that case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user HungUnicorn commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3036#discussion_r93916373

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java —
          @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d
          if (partitionsForTopic != null)

          { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); }

          + else{
          + throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topic);
          — End diff –

          Agree. I will change the exception message to INFO log message.

          Show
          githubbot ASF GitHub Bot added a comment - Github user HungUnicorn commented on a diff in the pull request: https://github.com/apache/flink/pull/3036#discussion_r93916373 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java — @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } + else{ + throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topic); — End diff – Agree. I will change the exception message to INFO log message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user HungUnicorn closed the pull request at:

          https://github.com/apache/flink/pull/3036

          Show
          githubbot ASF GitHub Bot added a comment - Github user HungUnicorn closed the pull request at: https://github.com/apache/flink/pull/3036
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user HungUnicorn reopened a pull request:

          https://github.com/apache/flink/pull/3036

          FLINK-5368 Throw exception if kafka topic doesn't exist

          As a developer when reading data from many topics, I want Kafka consumer to show something if any topic is not available.

          The motivation is we read many topics as list at one time, and sometimes we fail to recognize that one or two topics' names have been changed or deprecated, and Flink Kafka connector didn't show the error.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/HungUnicorn/flink master

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3036.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3036


          commit b632edead770fd8386a65b6f67c739ad9c280a7c
          Author: HungUnicorn <unicorn.banachi@gmail.com>
          Date: 2016-12-27T10:37:30Z

          FLINK-5368 log msg if kafka topic doesn't have any partitions


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user HungUnicorn reopened a pull request: https://github.com/apache/flink/pull/3036 FLINK-5368 Throw exception if kafka topic doesn't exist As a developer when reading data from many topics, I want Kafka consumer to show something if any topic is not available. The motivation is we read many topics as list at one time, and sometimes we fail to recognize that one or two topics' names have been changed or deprecated, and Flink Kafka connector didn't show the error. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HungUnicorn/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3036.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3036 commit b632edead770fd8386a65b6f67c739ad9c280a7c Author: HungUnicorn <unicorn.banachi@gmail.com> Date: 2016-12-27T10:37:30Z FLINK-5368 log msg if kafka topic doesn't have any partitions
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3036#discussion_r94748197

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java —
          @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d
          if (partitionsForTopic != null)

          { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); }

          + else{
          + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic);
          + }
          }
          }

          • if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - }
              • End diff –

          I think we still want to fail if there are no partitions at all for ALL topics, correct?
          This change here will remove that behaviour.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3036#discussion_r94748197 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java — @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } + else{ + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic); + } } } if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - } End diff – I think we still want to fail if there are no partitions at all for ALL topics, correct? This change here will remove that behaviour.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user HungUnicorn commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3036#discussion_r94943539

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java —
          @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d
          if (partitionsForTopic != null)

          { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); }

          + else{
          + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic);
          + }
          }
          }

          • if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - }
              • End diff –

          To throw exception fits our case better than to log INFO message, but @rmetzger suggested that some cases might be fine and to log INFO is more general if I understood correctly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user HungUnicorn commented on a diff in the pull request: https://github.com/apache/flink/pull/3036#discussion_r94943539 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java — @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } + else{ + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic); + } } } if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - } End diff – To throw exception fits our case better than to log INFO message, but @rmetzger suggested that some cases might be fine and to log INFO is more general if I understood correctly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3036#discussion_r94956342

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java —
          @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d
          if (partitionsForTopic != null)

          { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); }

          + else{
          + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic);
          + }
          }
          }

          • if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - }
              • End diff –

          I think Robert was suggesting to log messages only for cases where some topics don't have partitions, so that the consumer doesn't just fail if for example only 1 out of 10 topics don't have partitions.

          If ALL topics failed to return partitions in the end, we probably should still fail the consumer, like before.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3036#discussion_r94956342 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java — @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> d if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } + else{ + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic); + } } } if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - } End diff – I think Robert was suggesting to log messages only for cases where some topics don't have partitions, so that the consumer doesn't just fail if for example only 1 out of 10 topics don't have partitions. If ALL topics failed to return partitions in the end, we probably should still fail the consumer, like before.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user Rohithyeravothula commented on the issue:

          https://github.com/apache/flink/pull/3036

          is the issue still open

          Show
          githubbot ASF GitHub Bot added a comment - Github user Rohithyeravothula commented on the issue: https://github.com/apache/flink/pull/3036 is the issue still open
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3036

          +1 to merge. Thank you for the reminder @Rohithyeravothula.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3036 +1 to merge. Thank you for the reminder @Rohithyeravothula.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3036

          Merging this ...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3036 Merging this ...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3036

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3036
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Resolved for master with f266e825557f4091094a866e6887f52ca54ff2d7.
          Resolved for 1.2 with 69492763444a9cc5efb1e26e2864abce71787211.

          Thanks for your contribution Sendoh!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master with f266e825557f4091094a866e6887f52ca54ff2d7. Resolved for 1.2 with 69492763444a9cc5efb1e26e2864abce71787211. Thanks for your contribution Sendoh !

            People

            • Assignee:
              HungChang Sendoh
              Reporter:
              HungChang Sendoh
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development