Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6288

FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      The FlinkKafkaProducerBase supports routing records to topics besides the default topic, but the custom Partitioner interface does not follow this semantic.

      The partitioner is always invoked the partition method with the number of partitions in the default topic, and not the number of partitions of the current targetTopic.

        Issue Links

          Activity

          Hide
          zjureel Fang Yong added a comment -

          Hi Tzu-Li (Gordon) Tai, I'll pick this issue if nobody follows it, thanks.

          Show
          zjureel Fang Yong added a comment - Hi Tzu-Li (Gordon) Tai , I'll pick this issue if nobody follows it, thanks.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Sure, please go ahead. And thanks for picking this up!

          By the way, just to make sure you're up to sync with the whole picture of this JIRA, here's the mailing list thread where the discussion of this issue took place: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Possible-bug-in-Kafka-producer-partitioning-logic-td16972.html.

          cc Robert Metzger, in case you would like to chime in any early discussions here, as I think this would require a change in our Kafka sink's custom partitioner API (see ML thread for details).

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Sure, please go ahead. And thanks for picking this up! By the way, just to make sure you're up to sync with the whole picture of this JIRA, here's the mailing list thread where the discussion of this issue took place: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Possible-bug-in-Kafka-producer-partitioning-logic-td16972.html . cc Robert Metzger , in case you would like to chime in any early discussions here, as I think this would require a change in our Kafka sink's custom partitioner API (see ML thread for details).
          Hide
          zjureel Fang Yong added a comment - - edited

          Thanks for providing the mailing list for this issue, I think this is very helpful for me. As discussed in the mailing list, I'm agree to add a extra API to support the partitioner for each topic. I think a field named Map<String, PartitionerInfo> topicPartitionerMap should be added in FlinkKafkaProducerBase, where the PartitionerInfo projo contains fields such as topic/partitions.

          User can use an extra api naned addTopicPartitioner in FlinkKafkaProducerBase to add his special topic and partitioner, all the topic and partitioner in the topicPartitionerMap will be intialed in the open of FlinkKafkaProducerBase. When new data arrives to te sink and the target topic is in the topicPartitionerMap, the topic's special Partitioner whill be used to slice the data.

          Show
          zjureel Fang Yong added a comment - - edited Thanks for providing the mailing list for this issue, I think this is very helpful for me. As discussed in the mailing list, I'm agree to add a extra API to support the partitioner for each topic. I think a field named Map<String, PartitionerInfo> topicPartitionerMap should be added in FlinkKafkaProducerBase, where the PartitionerInfo projo contains fields such as topic/partitions. User can use an extra api naned addTopicPartitioner in FlinkKafkaProducerBase to add his special topic and partitioner, all the topic and partitioner in the topicPartitionerMap will be intialed in the open of FlinkKafkaProducerBase. When new data arrives to te sink and the target topic is in the topicPartitionerMap, the topic's special Partitioner whill be used to slice the data.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user fanyon opened a pull request:

          https://github.com/apache/flink/pull/3766

          FLINK-6288 fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

          1. add extra api addTopicPartitioner, user can use it to add special topic and partitioner
          2. add topicPartitionerMap in FlinkKafkaProducerBase to store the topic and partitioner
          3. add PartitionerInfo to manage the topic and partitioner info

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/fanyon/flink FLINK-6288

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3766.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3766


          commit a525fe605c25ce2e3c8c30cbc7c60542243c0a18
          Author: mengji.fy <mengji.fy@taobao.com>
          Date: 2017-04-24T06:16:48Z

          FLINK-6288 fix target topic uses partitioner of default topic

          commit 071e06c00e8a2346d4ebcede8784f1ada5457da2
          Author: mengji.fy <mengji.fy@taobao.com>
          Date: 2017-04-25T02:03:08Z

          add serialVersionUID field in PartitionerInfo


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user fanyon opened a pull request: https://github.com/apache/flink/pull/3766 FLINK-6288 fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic 1. add extra api addTopicPartitioner, user can use it to add special topic and partitioner 2. add topicPartitionerMap in FlinkKafkaProducerBase to store the topic and partitioner 3. add PartitionerInfo to manage the topic and partitioner info You can merge this pull request into a Git repository by running: $ git pull https://github.com/fanyon/flink FLINK-6288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3766.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3766 commit a525fe605c25ce2e3c8c30cbc7c60542243c0a18 Author: mengji.fy <mengji.fy@taobao.com> Date: 2017-04-24T06:16:48Z FLINK-6288 fix target topic uses partitioner of default topic commit 071e06c00e8a2346d4ebcede8784f1ada5457da2 Author: mengji.fy <mengji.fy@taobao.com> Date: 2017-04-25T02:03:08Z add serialVersionUID field in PartitionerInfo
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3766

          Thanks for the PR @fanyon. I'll try to look at the changes soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 Thanks for the PR @fanyon. I'll try to look at the changes soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/3766

          Hi,
          Thanks for the PR!

          The first problem I noticed with this approach is that it will not work if users want to partition dynamically created topics (my use case actually).

          We should have a default partitioner that could be applied to the unmatched topics and would always pass the correct partition number.

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 Hi, Thanks for the PR! The first problem I noticed with this approach is that it will not work if users want to partition dynamically created topics (my use case actually). We should have a default partitioner that could be applied to the unmatched topics and would always pass the correct partition number.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon commented on the issue:

          https://github.com/apache/flink/pull/3766

          @gyfora Thanks for your comment. And right, your question is very good. I originally thought that the user must be sure to know all the output topic when the job is submitted, but in the real business scenario, the data may be output to the dynamically generated topic.

          For the requirementof generate dynamic topic, I propose to adjust the open and partition api of KafkaPartitioner as follows:
          1. The open method, remove the parameter int[] partitions, and will be opend once for each partitioner
          public void open(int parallelInstanceId, int parallelInstances)
          2. The partition method, add int[] partitions and target topic parameters
          public int partition(T next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions)

          @gyfora @tzulitai What do you think of this? Please feel free to give any suggestions, thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @gyfora Thanks for your comment. And right, your question is very good. I originally thought that the user must be sure to know all the output topic when the job is submitted, but in the real business scenario, the data may be output to the dynamically generated topic. For the requirementof generate dynamic topic, I propose to adjust the open and partition api of KafkaPartitioner as follows: 1. The open method, remove the parameter int[] partitions, and will be opend once for each partitioner public void open(int parallelInstanceId, int parallelInstances) 2. The partition method, add int[] partitions and target topic parameters public int partition(T next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) @gyfora @tzulitai What do you think of this? Please feel free to give any suggestions, thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/3766

          How would this new API map to the current one in terms of backwards compatibility?

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 How would this new API map to the current one in terms of backwards compatibility?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon commented on the issue:

          https://github.com/apache/flink/pull/3766

          The new API should in a new class, such as FlinkKafkaPartitioner. For the older KafkaPartitioner implementation, it will be delegated by FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to the current one in terms. Of course, as it is now, default topic's partitions will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 The new API should in a new class, such as FlinkKafkaPartitioner. For the older KafkaPartitioner implementation, it will be delegated by FlinkKafkaAdpterPartitioner extends FlinkKafkaPartitioner, in which contains defaultTopicId/partitions and KafkaPartitioner delegate, and map the new api to the current one in terms. Of course, as it is now, default topic's partitions will be used for the diffrent target topics in FlinkKafkaAdpterPartitioner.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/3766

          I think this is reasonable as the current implementation doesnt work for dynamic new topics. (we should also deprecate the current one)

          But let's hear what others say

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 I think this is reasonable as the current implementation doesnt work for dynamic new topics. (we should also deprecate the current one) But let's hear what others say
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon commented on the issue:

          https://github.com/apache/flink/pull/3766

          @gyfora For the method `int partition(T next, byte[] serializedKey, byte[] serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition num of target topic can be used. But the KafkaPartitioner's partition id array has been initialized in `void open(int parallelInstanceId, int parallelInstances, int[] partitions)`, which will be executed once, so yes, the problem for dynamic new topics when user uses older KafkaPartitioner API in their older job will still exist, and I find it hard to solve this problem completely.

          What do you think of this? @tzulitai

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @gyfora For the method `int partition(T next, byte[] serializedKey, byte[] serializedValue, int partitionNum)` in KafkaPartitioner, the correct partition num of target topic can be used. But the KafkaPartitioner's partition id array has been initialized in `void open(int parallelInstanceId, int parallelInstances, int[] partitions)`, which will be executed once, so yes, the problem for dynamic new topics when user uses older KafkaPartitioner API in their older job will still exist, and I find it hard to solve this problem completely. What do you think of this? @tzulitai
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3766

          Nice following the discussions here Let me wrap up the discussion so far:

          The old way -
          ```
          interface KafkaPartitioner

          { void open(int[] partitions, int subtaskIndex, int numSubtasks); int partition(T record, byte[] key, byte[] value, int numPartitions); }

          ```

          The (last) proposed new way -
          ```
          interface FlinkKafkaPartitioner

          { void open(int subtaskIndex, int numSubtask); int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) }

          ```
          and have an internal cache of partitioner informations: `Map<String, PartitionerInfo>`.
          The `PartitionerInfo` can actually just be the partition id array, I don't think we need another wrapper class if we just need a single `FlinkKafkaPartitioner` per subtask for all (including dynamic) topics.

          I like the proposal of the new partitioner, as then users do not need to provide multiple partitioners. Just the question with how well this works for the general use case, because then implementations of the new `partition` method need to handle different topics (which probably makes sense because we want to generally treat topics as dynamic anyways ..). The new way can also allow us to handle upscaled target topics in the future.

          For migration, for the dummy wrapper delegation, I think we should just mimc the wrong, old behaviour. That was the behaviour it had always been anyways, so we should not try to alter the behaviour if the user is still using the old API. Deprecation and Javadoc message is responsible of pushing them to change to the new API.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 Nice following the discussions here Let me wrap up the discussion so far: The old way - ``` interface KafkaPartitioner { void open(int[] partitions, int subtaskIndex, int numSubtasks); int partition(T record, byte[] key, byte[] value, int numPartitions); } ``` The (last) proposed new way - ``` interface FlinkKafkaPartitioner { void open(int subtaskIndex, int numSubtask); int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) } ``` and have an internal cache of partitioner informations: `Map<String, PartitionerInfo>`. The `PartitionerInfo` can actually just be the partition id array, I don't think we need another wrapper class if we just need a single `FlinkKafkaPartitioner` per subtask for all (including dynamic) topics. I like the proposal of the new partitioner, as then users do not need to provide multiple partitioners. Just the question with how well this works for the general use case, because then implementations of the new `partition` method need to handle different topics (which probably makes sense because we want to generally treat topics as dynamic anyways ..). The new way can also allow us to handle upscaled target topics in the future. For migration, for the dummy wrapper delegation, I think we should just mimc the wrong, old behaviour. That was the behaviour it had always been anyways, so we should not try to alter the behaviour if the user is still using the old API. Deprecation and Javadoc message is responsible of pushing them to change to the new API.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3766

          One thing to be careful with, though:
          Since now we're querying Kafka for partition metadata within the `invoke` method, the query must be handled robustly and make sure it doesn't result in unexpectedly longer checkpoint times by blocking the whole stream at the Kafka sink.

          Most notably, we need to consider the corner case where Kafka isn't cooperating nicely:
          1. how to handle arbitrary long response time for fetching the partition metadata?
          2. how to handle the case where, due to some Kafka brokers temporary unavailable, the returned partitions is not complete?

          For 2., I can also forsee that we have a separate "partitions update thread" that refreshes the `Map<String, int[]>` cache continuously at a fixed interval. This can also involve to a `FlinkKafkaPartitioner` that can provide dynamically changing `int[] partitions` when invoking the `partition` method.

          Perhaps we shouldn't include that with this PR, as its orthogonal to the API change. Just some food for though

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 One thing to be careful with, though: Since now we're querying Kafka for partition metadata within the `invoke` method, the query must be handled robustly and make sure it doesn't result in unexpectedly longer checkpoint times by blocking the whole stream at the Kafka sink. Most notably, we need to consider the corner case where Kafka isn't cooperating nicely: 1. how to handle arbitrary long response time for fetching the partition metadata? 2. how to handle the case where, due to some Kafka brokers temporary unavailable, the returned partitions is not complete? For 2., I can also forsee that we have a separate "partitions update thread" that refreshes the `Map<String, int[]>` cache continuously at a fixed interval. This can also involve to a `FlinkKafkaPartitioner` that can provide dynamically changing `int[] partitions` when invoking the `partition` method. Perhaps we shouldn't include that with this PR, as its orthogonal to the API change. Just some food for though
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon commented on the issue:

          https://github.com/apache/flink/pull/3766

          @tzulitai Thank you for your reply. For 2, I think a new issue may be created later.

          For 1, it is really a problem for which will block the running job. There may be two ways:

          1. Depend on the timeout mechanism of kafka. When fetching partition meta from kafka, some timeout configurations should be setted.
          2. Using Future to get the partiton meta of kafka, and user can set the timeout with configuration.

          For the 1th way, problem may still exist for network and other reasons, so I'm apt to use the 2ed way.
          cc @gyfora

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai Thank you for your reply. For 2, I think a new issue may be created later. For 1, it is really a problem for which will block the running job. There may be two ways: 1. Depend on the timeout mechanism of kafka. When fetching partition meta from kafka, some timeout configurations should be setted. 2. Using Future to get the partiton meta of kafka, and user can set the timeout with configuration. For the 1th way, problem may still exist for network and other reasons, so I'm apt to use the 2ed way. cc @gyfora
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/3766

          I liked the proposed API and I agree that it's probably best to keep the old behaviour for the deprecated API.

          I don't think the Kafka partition info fetching should be a huge problem as it shouldnt happen too often and Kafka should be able to return the info if you can write to it. We of course need some timeout/retry mechanism to not fail unnecessarily.

          The producer itself is not very resilient in case of errors in the current state as it can't really handle the async errors it will just shut throw them and fail.

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 I liked the proposed API and I agree that it's probably best to keep the old behaviour for the deprecated API. I don't think the Kafka partition info fetching should be a huge problem as it shouldnt happen too often and Kafka should be able to return the info if you can write to it. We of course need some timeout/retry mechanism to not fail unnecessarily. The producer itself is not very resilient in case of errors in the current state as it can't really handle the async errors it will just shut throw them and fail.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon commented on the issue:

          https://github.com/apache/flink/pull/3766

          As discussed above, I have updated the API and code. Please review the changes when you are free, and be free to give any suggestions to me, thanks! @gyfora @tzulitai

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 As discussed above, I have updated the API and code. Please review the changes when you are free, and be free to give any suggestions to me, thanks! @gyfora @tzulitai
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3766#discussion_r114282783

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java —
          @@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
          }
          }

          + protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
          + Future<int[]> future = executor.submit(new PartitionMetaTask(topic, producer));
          +
          + try

          { + return future.get(kafkaMetaTimeoutMs, TimeUnit.MILLISECONDS); + }

          catch (Exception e) {
          + throw new RuntimeException(e);
          — End diff –

          Should we maybe retry here a few times?

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/3766#discussion_r114282783 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java — @@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } } + protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) { + Future<int[]> future = executor.submit(new PartitionMetaTask(topic, producer)); + + try { + return future.get(kafkaMetaTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new RuntimeException(e); — End diff – Should we maybe retry here a few times?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3766#discussion_r114466534

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java —
          @@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
          }
          }

          + protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
          + Future<int[]> future = executor.submit(new PartitionMetaTask(topic, producer));
          +
          + try

          { + return future.get(kafkaMetaTimeoutMs, TimeUnit.MILLISECONDS); + }

          catch (Exception e) {
          + throw new RuntimeException(e);
          — End diff –

          Yes, retry here will be nicer, I'll fix it, thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon commented on a diff in the pull request: https://github.com/apache/flink/pull/3766#discussion_r114466534 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java — @@ -281,6 +307,47 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } } + protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) { + Future<int[]> future = executor.submit(new PartitionMetaTask(topic, producer)); + + try { + return future.get(kafkaMetaTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new RuntimeException(e); — End diff – Yes, retry here will be nicer, I'll fix it, thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/3766

          @tzulitai should we try to get this in the release?

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai should we try to get this in the release?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3766

          @fanyon I'll finally have some time to get back to this PR this week (perhaps over the next 2 days). Thanks a lot for your patience ...

          @gyfora I'm personally a +1 to try to get this in the release because it really is a self-contained thing, but I think it'll probably depend on the status of the 1.3 release in the end.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 @fanyon I'll finally have some time to get back to this PR this week (perhaps over the next 2 days). Thanks a lot for your patience ... @gyfora I'm personally a +1 to try to get this in the release because it really is a self-contained thing, but I think it'll probably depend on the status of the 1.3 release in the end.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3766

          @zjureel could you rebase the PR on the latest master? Otherwise I cannot review the PR like this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 @zjureel could you rebase the PR on the latest master? Otherwise I cannot review the PR like this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3766

          seems like the relevant commits are e6ec702 and 64af26e.
          Let me try to resolve this ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 seems like the relevant commits are e6ec702 and 64af26e. Let me try to resolve this ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel commented on the issue:

          https://github.com/apache/flink/pull/3766

          @tzulitai Thank you for your replay.

          For 1, the `ExecutorService` is used to control timeout of fetching kafka partitions. When fetch kafka partitions, a `Future` will be created and executed in `ExecutorService`, which will wait for some mills and throw exception for timeout.
          For 2, I had deprecated constructors of 08 / 09/ 010 whose parameter is `KafkaPartitioner` and add the same constructors with parameter `FlinkKafkaPartitioner`.

          I find the codes in master of apache/flink were modified relative large some days ago, and I try rebase these modification soon. I think you can review these issues after that, thank you.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai Thank you for your replay. For 1, the `ExecutorService` is used to control timeout of fetching kafka partitions. When fetch kafka partitions, a `Future` will be created and executed in `ExecutorService`, which will wait for some mills and throw exception for timeout. For 2, I had deprecated constructors of 08 / 09/ 010 whose parameter is `KafkaPartitioner` and add the same constructors with parameter `FlinkKafkaPartitioner`. I find the codes in master of apache/flink were modified relative large some days ago, and I try rebase these modification soon. I think you can review these issues after that, thank you.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3766

          @zjureel thanks. You would need a proper rebase: `git rebase master` when you finish your feature branch, instead of merging the latest master.

          Regarding timeout: doesn't the Kafka client have built-in timeout functionality?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 @zjureel thanks. You would need a proper rebase: `git rebase master` when you finish your feature branch, instead of merging the latest master. Regarding timeout: doesn't the Kafka client have built-in timeout functionality?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3766

          @zjureel the rebases don't seem to be done correctly. The PR should contain the diff commits only.
          I'm not sure what went wrong, but perhaps the most easiest way right now is cherry-pick your diff commits on a new branch checkedout from the latest master.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3766 @zjureel the rebases don't seem to be done correctly. The PR should contain the diff commits only. I'm not sure what went wrong, but perhaps the most easiest way right now is cherry-pick your diff commits on a new branch checkedout from the latest master.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel commented on the issue:

          https://github.com/apache/flink/pull/3766

          @tzulitai Thank you for your suggestion, and I think you are right. I will create a new PR from the master and cherry-pick my commits for this issue soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai Thank you for your suggestion, and I think you are right. I will create a new PR from the master and cherry-pick my commits for this issue soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zjureel opened a pull request:

          https://github.com/apache/flink/pull/3901

          FLINK-6288 fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic

          …voked with number of partitions of default topic

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zjureel/flink FLINK-6288

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3901.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3901


          commit 301dfc6247645b690b4508abb2245ce5990321c0
          Author: zjureel <zjureel@gmail.com>
          Date: 2017-05-15T09:41:47Z

          FLINK-6288 fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/3901 FLINK-6288 fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic …voked with number of partitions of default topic Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3901.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3901 commit 301dfc6247645b690b4508abb2245ce5990321c0 Author: zjureel <zjureel@gmail.com> Date: 2017-05-15T09:41:47Z FLINK-6288 fix FlinkKafkaProducer's custom Partitioner is always invoked with number of partitions of default topic
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon commented on the issue:

          https://github.com/apache/flink/pull/3766

          @tzulitai I have created a new PR https://github.com/apache/flink/pull/3901(url), and I will close this PR, you can review the code there, thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon commented on the issue: https://github.com/apache/flink/pull/3766 @tzulitai I have created a new PR https://github.com/apache/flink/pull/3901 (url), and I will close this PR, you can review the code there, thanks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fanyon closed the pull request at:

          https://github.com/apache/flink/pull/3766

          Show
          githubbot ASF GitHub Bot added a comment - Github user fanyon closed the pull request at: https://github.com/apache/flink/pull/3766
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel commented on the issue:

          https://github.com/apache/flink/pull/3901

          @tzulitai I created the new PR here for issue 6288. As discussed in https://github.com/apache/flink/pull/3766(url), there are two ways to control timeout of fetching kafka meta

          1. Depend on the timeout mechanism of kafka. When fetching partition meta from kafka, some timeout configurations should be setted.
          2. Using `Future` to get the partiton meta of kafka, and user can set the timeout with configuration.

          In the new api, meta of kafka may be feched before data is sended, and some exceptions such as network or disk problem may still block data send even that user set the request timeout of kafka. So in the PR, I use Future to control the timeout of fetching kafka meta.

          The problem may not be so complicated, and we can use the timeout mechanism of kafka directly. What do you think of it? Thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3901 @tzulitai I created the new PR here for issue 6288. As discussed in https://github.com/apache/flink/pull/3766 (url), there are two ways to control timeout of fetching kafka meta 1. Depend on the timeout mechanism of kafka. When fetching partition meta from kafka, some timeout configurations should be setted. 2. Using `Future` to get the partiton meta of kafka, and user can set the timeout with configuration. In the new api, meta of kafka may be feched before data is sended, and some exceptions such as network or disk problem may still block data send even that user set the request timeout of kafka. So in the PR, I use Future to control the timeout of fetching kafka meta. The problem may not be so complicated, and we can use the timeout mechanism of kafka directly. What do you think of it? Thanks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3901

          @zjureel my main concern is the need of our own timeout when Kafka has the `max.block.ms` configuration. This seems to be the max timeout you're talking about?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 @zjureel my main concern is the need of our own timeout when Kafka has the `max.block.ms` configuration. This seems to be the max timeout you're talking about?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel commented on the issue:

          https://github.com/apache/flink/pull/3901

          Yes, user can config the timeout and retry times to fetch kafka meta beyond the configuration of kafka itself. @tzulitai

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3901 Yes, user can config the timeout and retry times to fetch kafka meta beyond the configuration of kafka itself. @tzulitai
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3901

          @zjureel then I would suggest to remove the usage of ExecutorService for the timeout. In what other cases would that be required?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 @zjureel then I would suggest to remove the usage of ExecutorService for the timeout. In what other cases would that be required?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel commented on the issue:

          https://github.com/apache/flink/pull/3901

          @tzulitai I agree with you and `ExecutorService` is really unnecessary. I will update the code and remove `ExecutorService`

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3901 @tzulitai I agree with you and `ExecutorService` is really unnecessary. I will update the code and remove `ExecutorService`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3901

          This looks good! I'll do some final tweaks to the styling of the code and then merge this

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 This looks good! I'll do some final tweaks to the styling of the code and then merge this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3901

          @zjureel I'll be merging this only for 1.4-SNAPSHOT. The main reason is that there is a lot of deprecation / API change with this, and I'll like to ship it along with other API changes we had in mind to make sure they go nicely together.

          This is of highest priority for at the moment, and I'll make sure it gets into 1.4-SNAPSHOT as soon as possible.

          @gyfora I understand that this was sort of a blocker for you, but I would prefer to not rush the changes into the release. I hope you can understand.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 @zjureel I'll be merging this only for 1.4-SNAPSHOT. The main reason is that there is a lot of deprecation / API change with this, and I'll like to ship it along with other API changes we had in mind to make sure they go nicely together. This is of highest priority for at the moment, and I'll make sure it gets into 1.4-SNAPSHOT as soon as possible. @gyfora I understand that this was sort of a blocker for you, but I would prefer to not rush the changes into the release. I hope you can understand.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/3901

          Well, I can understand it but will mean that we have to keep running with a custom build because there is no way to work around this nicely.

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3901 Well, I can understand it but will mean that we have to keep running with a custom build because there is no way to work around this nicely.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/3901

          On the other hand this is potentially causing major data skew or errors for any people who are using the dynamic topics (they might not even be aware of it).

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3901 On the other hand this is potentially causing major data skew or errors for any people who are using the dynamic topics (they might not even be aware of it).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3901

          I think you've made a good point Gyula.
          Alright, let me try to stretch a bit and merge this for 1.3.0 also.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 I think you've made a good point Gyula. Alright, let me try to stretch a bit and merge this for 1.3.0 also.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/3901

          Thanks @tzulitai I don't feel very strongly about either way, I am just concerned for other users. I leave this decision to you I know you are already flooded with other stuff around the release.

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3901 Thanks @tzulitai I don't feel very strongly about either way, I am just concerned for other users. I leave this decision to you I know you are already flooded with other stuff around the release.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3901

          I'm currently waiting for a Travis run locally for https://github.com/tzulitai/flink/tree/FLINK-6288.
          That branch also includes more tests and tweak cleanups for the feature.

          Once it gives green, will merge to `master` and `release-1.3.0`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3901 I'm currently waiting for a Travis run locally for https://github.com/tzulitai/flink/tree/FLINK-6288 . That branch also includes more tests and tweak cleanups for the feature. Once it gives green, will merge to `master` and `release-1.3.0`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zjureel commented on the issue:

          https://github.com/apache/flink/pull/3901

          @gyfora Thanks for you suggestion, and I agree with you that problems which cause errors to users should be addressed at a higher priority level.

          Thank you for merging this issues @tzulitai

          Show
          githubbot ASF GitHub Bot added a comment - Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3901 @gyfora Thanks for you suggestion, and I agree with you that problems which cause errors to users should be addressed at a higher priority level. Thank you for merging this issues @tzulitai
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3901

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3901
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for master via e3fcbb087568a33aa03f58ba0dc359b1a6b02bfd.
          Fixed for 1.3 via d3b587096b1fd694625429aa32557e70ed84955d.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for master via e3fcbb087568a33aa03f58ba0dc359b1a6b02bfd. Fixed for 1.3 via d3b587096b1fd694625429aa32557e70ed84955d.

            People

            • Assignee:
              zjureel Fang Yong
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development