Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6998

Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.4.0, 1.3.2
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in KafkaConsumerThread class.

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Thanks for the contribution!
          Fixed for release-1.3 via 4382464fd140aae70767a3c9776cf27f3b57355b.
          Fixed for master via 1ded2d867d9c30f01395494adec3cbaa629bbb5a.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Thanks for the contribution! Fixed for release-1.3 via 4382464fd140aae70767a3c9776cf27f3b57355b. Fixed for master via 1ded2d867d9c30f01395494adec3cbaa629bbb5a.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4187

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4187
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4187

          Merging ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4187 Merging ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r130058523

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java —
          @@ -482,6 +501,12 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception

          if (ex != null) {
          log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
          + if (callerCommitCallback != null) {
          — End diff –

          See my comment above. Would like to remove these null checks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r130058523 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java — @@ -482,6 +501,12 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception if (ex != null) { log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex); + if (callerCommitCallback != null) { — End diff – See my comment above. Would like to remove these null checks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r130058545

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java —
          @@ -482,6 +501,12 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception

          if (ex != null) {
          log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
          + if (callerCommitCallback != null)

          { + callerCommitCallback.onException(ex); + }

          + }
          + else if (callerCommitCallback != null) {
          — End diff –

          See my comment above. Would like to remove these null checks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r130058545 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java — @@ -482,6 +501,12 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception if (ex != null) { log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex); + if (callerCommitCallback != null) { + callerCommitCallback.onException(ex); + } + } + else if (callerCommitCallback != null) { — End diff – See my comment above. Would like to remove these null checks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r130058480

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java —
          @@ -346,15 +346,21 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti
          // ------------------------------------------------------------------------

          @Override

          • public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
            + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
            ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
            if (zkHandler != null) {
            try
            Unknown macro: { // the ZK handler takes care of incrementing the offsets by 1 before committing zkHandler.prepareAndCommitOffsets(offsets); + if (commitCallback != null) { + commitCallback.onSuccess(); + } }

            catch (Exception e) {
            if (running) {
            + if (commitCallback != null) {

              • End diff –

          See my comment above. Would like to remove these null checks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r130058480 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java — @@ -346,15 +346,21 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti // ------------------------------------------------------------------------ @Override public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception { ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler; if (zkHandler != null) { try Unknown macro: { // the ZK handler takes care of incrementing the offsets by 1 before committing zkHandler.prepareAndCommitOffsets(offsets); + if (commitCallback != null) { + commitCallback.onSuccess(); + } } catch (Exception e) { if (running) { + if (commitCallback != null) { End diff – See my comment above. Would like to remove these null checks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r130058073

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java —
          @@ -308,17 +311,33 @@ public void shutdown() {
          *

          • <p>Only one commit operation may be pending at any time. If the committing takes longer than
          • the frequency with which this method is called, then some commits may be skipped due to being
          • * superseded by newer ones.
            + * superseded by newer ones.
            *
          • @param offsetsToCommit The offsets to commit
            */
            public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
              • End diff –

          I don't think we need this variant anymore. The only user of the method is `Kafka09Fetcher`, anyways.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r130058073 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java — @@ -308,17 +311,33 @@ public void shutdown() { * <p>Only one commit operation may be pending at any time. If the committing takes longer than the frequency with which this method is called, then some commits may be skipped due to being * superseded by newer ones. + * superseded by newer ones. * @param offsetsToCommit The offsets to commit */ public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) { End diff – I don't think we need this variant anymore. The only user of the method is `Kafka09Fetcher`, anyways.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r130058404

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java —
          @@ -346,15 +346,21 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti
          // ------------------------------------------------------------------------

          @Override

          • public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
            + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
            ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
            if (zkHandler != null) {
            try {
            // the ZK handler takes care of incrementing the offsets by 1 before committing
            zkHandler.prepareAndCommitOffsets(offsets);
            + if (commitCallback != null) {
              • End diff –

          I would actually like to remove these null checks, and have the contract that a callback will always be provided with `@Nonnull` annotation.

          AFAIK, the only reason we need these null checks is that the tests, for simplicity, provide a `null` as the callback. IMO, it isn't a good practice to have logic in the main code just to satisfy testing shortcuts.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r130058404 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java — @@ -346,15 +346,21 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti // ------------------------------------------------------------------------ @Override public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception { ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler; if (zkHandler != null) { try { // the ZK handler takes care of incrementing the offsets by 1 before committing zkHandler.prepareAndCommitOffsets(offsets); + if (commitCallback != null) { End diff – I would actually like to remove these null checks, and have the contract that a callback will always be provided with `@Nonnull` annotation. AFAIK, the only reason we need these null checks is that the tests, for simplicity, provide a `null` as the callback. IMO, it isn't a good practice to have logic in the main code just to satisfy testing shortcuts.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r129663595

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java —
          @@ -0,0 +1,34 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka.internals;
          +
          +/**
          + * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
          + * which should normally be triggered from checkpoint complete event.
          + */
          +public interface KafkaCommitCallback {
          — End diff –

          Having the two methods would make the om success case more clear...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r129663595 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java — @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * A callback interface that the source operator can implement to trigger custom actions when a commit request completes, + * which should normally be triggered from checkpoint complete event. + */ +public interface KafkaCommitCallback { — End diff – Having the two methods would make the om success case more clear...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r129663458

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java —
          @@ -0,0 +1,34 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka.internals;
          +
          +/**
          + * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
          + * which should normally be triggered from checkpoint complete event.
          + */
          +public interface KafkaCommitCallback {
          +
          + /**
          + * A callback method the user can implement to provide asynchronous handling of commit request completion.
          + * This method will be called when the commit request sent to the server has been acknowledged.
          + *
          + * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
          + */
          + void onComplete(Exception exception);
          — End diff –

          Most other exception handlers take a `Throwable`. Would it make sense to do that here as well?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r129663458 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java — @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * A callback interface that the source operator can implement to trigger custom actions when a commit request completes, + * which should normally be triggered from checkpoint complete event. + */ +public interface KafkaCommitCallback { + + /** + * A callback method the user can implement to provide asynchronous handling of commit request completion. + * This method will be called when the commit request sent to the server has been acknowledged. + * + * @param exception The exception thrown during processing of the request, or null if the commit completed successfully + */ + void onComplete(Exception exception); — End diff – Most other exception handlers take a `Throwable`. Would it make sense to do that here as well?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r129663181

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java —
          @@ -0,0 +1,34 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka.internals;
          +
          +/**
          + * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
          + * which should normally be triggered from checkpoint complete event.
          + */
          +public interface KafkaCommitCallback {
          — End diff –

          I think it would be nice to have two methods here: `onSuccess()` and `onException(...)`.
          Or does this have to be a SAM interface so you can use lambdas?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r129663181 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java — @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * A callback interface that the source operator can implement to trigger custom actions when a commit request completes, + * which should normally be triggered from checkpoint complete event. + */ +public interface KafkaCommitCallback { — End diff – I think it would be nice to have two methods here: `onSuccess()` and `onException(...)`. Or does this have to be a SAM interface so you can use lambdas?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on the issue:

          https://github.com/apache/flink/pull/4187

          @tzulitai all tests passing now. let me know if this looks ok now, also let me know if you want me to go ahead squash all commits.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on the issue: https://github.com/apache/flink/pull/4187 @tzulitai all tests passing now. let me know if this looks ok now, also let me know if you want me to go ahead squash all commits.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r128078950

          — Diff: docs/monitoring/metrics.md —
          @@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier:
          </tbody>
          </table>

          +#### Connector:
          — End diff –

          ok make sense

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r128078950 — Diff: docs/monitoring/metrics.md — @@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier: </tbody> </table> +#### Connector: — End diff – ok make sense
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127966695

          — Diff: docs/monitoring/metrics.md —
          @@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier:
          </tbody>
          </table>

          +#### Connector:
          — End diff –

          Let's name this heading "Connectors", and add a new sub-heading specifically for kafka.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127966695 — Diff: docs/monitoring/metrics.md — @@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier: </tbody> </table> +#### Connector: — End diff – Let's name this heading "Connectors", and add a new sub-heading specifically for kafka.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127842765

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception

          { throw new Exception("The partitions were not set for the consumer"); }

          + // initialize commit metrics and default offset callback method
          + this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
          + this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
          +
          + this.offsetCommitCallback = new KafkaCommitCallback() {
          + @Override
          + public void onComplete(Exception exception) {
          + if (exception == null) {
          + successfulCommits.inc();
          — End diff –

          I think I'll add a detailed javadoc to describe the current unprotected implementation. I am hesitant to add in lock protection because higher level abstraction currently guarantees no concurrent commit at the moment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127842765 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception { throw new Exception("The partitions were not set for the consumer"); } + // initialize commit metrics and default offset callback method + this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded"); + this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed"); + + this.offsetCommitCallback = new KafkaCommitCallback() { + @Override + public void onComplete(Exception exception) { + if (exception == null) { + successfulCommits.inc(); — End diff – I think I'll add a detailed javadoc to describe the current unprotected implementation. I am hesitant to add in lock protection because higher level abstraction currently guarantees no concurrent commit at the moment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127838984

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -185,6 +187,18 @@
          private volatile boolean running = true;

          // ------------------------------------------------------------------------
          + // internal metrics
          + // ------------------------------------------------------------------------
          +
          + /** Counter for successful Kafka offset commits. */
          + private transient Counter successfulCommits;
          +
          + /** Counter for failed Kafka offset commits. */
          + private transient Counter failedCommits;
          +
          + private transient KafkaCommitCallback offsetCommitCallback;
          — End diff –

          Sounds fair, I'll include a javadoc as well a notice about the thread-safety contract as you suggested.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127838984 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -185,6 +187,18 @@ private volatile boolean running = true; // ------------------------------------------------------------------------ + // internal metrics + // ------------------------------------------------------------------------ + + /** Counter for successful Kafka offset commits. */ + private transient Counter successfulCommits; + + /** Counter for failed Kafka offset commits. */ + private transient Counter failedCommits; + + private transient KafkaCommitCallback offsetCommitCallback; — End diff – Sounds fair, I'll include a javadoc as well a notice about the thread-safety contract as you suggested.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127636523

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -185,6 +187,18 @@
          private volatile boolean running = true;

          // ------------------------------------------------------------------------
          + // internal metrics
          + // ------------------------------------------------------------------------
          +
          + /** Counter for successful Kafka offset commits. */
          + private transient Counter successfulCommits;
          +
          + /** Counter for failed Kafka offset commits. */
          + private transient Counter failedCommits;
          +
          + private transient KafkaCommitCallback offsetCommitCallback;
          — End diff –

          Can you include a Javadoc for this too? (for consistency)

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127636523 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -185,6 +187,18 @@ private volatile boolean running = true; // ------------------------------------------------------------------------ + // internal metrics + // ------------------------------------------------------------------------ + + /** Counter for successful Kafka offset commits. */ + private transient Counter successfulCommits; + + /** Counter for failed Kafka offset commits. */ + private transient Counter failedCommits; + + private transient KafkaCommitCallback offsetCommitCallback; — End diff – Can you include a Javadoc for this too? (for consistency)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127636725

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception

          { throw new Exception("The partitions were not set for the consumer"); }

          + // initialize commit metrics and default offset callback method
          + this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
          + this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
          +
          + this.offsetCommitCallback = new KafkaCommitCallback() {
          + @Override
          + public void onComplete(Exception exception) {
          + if (exception == null) {
          + successfulCommits.inc();
          — End diff –

          I would also like to raise a thread-safety issue here.

          Currently, since there's always only one pending offset commit in Kafka 09+, and Kafka08 commits in a blocking call, there will be no race condition in incrementing the counters. However, changing these implementations in subclasses (perhaps in the future) can easily introduce race conditions here.

          At the very least, we probably should add a notice about thread-safety contract in the Javadoc of `commitInternalOffsetsToKafka`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127636725 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception { throw new Exception("The partitions were not set for the consumer"); } + // initialize commit metrics and default offset callback method + this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded"); + this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed"); + + this.offsetCommitCallback = new KafkaCommitCallback() { + @Override + public void onComplete(Exception exception) { + if (exception == null) { + successfulCommits.inc(); — End diff – I would also like to raise a thread-safety issue here. Currently, since there's always only one pending offset commit in Kafka 09+, and Kafka08 commits in a blocking call, there will be no race condition in incrementing the counters. However, changing these implementations in subclasses (perhaps in the future) can easily introduce race conditions here. At the very least, we probably should add a notice about thread-safety contract in the Javadoc of `commitInternalOffsetsToKafka`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127636859

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java —
          @@ -0,0 +1,33 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka.internals;
          +
          +/**
          + * A callback interface that the user can implement to trigger custom actions when a commit request completes.
          + */
          +public interface KafkaCommitCallback {
          — End diff –

          Not entirely sure of the need of an interface here. Do we allow "version-specific subclasses" to have their own `KafkaCommitCallback` implementation?

          If not, then this makes the usage slightly confusing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127636859 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java — @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * A callback interface that the user can implement to trigger custom actions when a commit request completes. + */ +public interface KafkaCommitCallback { — End diff – Not entirely sure of the need of an interface here. Do we allow "version-specific subclasses" to have their own `KafkaCommitCallback` implementation? If not, then this makes the usage slightly confusing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127636481

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java —
          @@ -293,12 +296,28 @@ public void shutdown() {

          • @param offsetsToCommit The offsets to commit
            */
            public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) { + setOffsetsToCommit(offsetsToCommit, null); + }

            +
            + /**
            + * Tells this thread to commit a set of offsets. This method does not block, the committing
            + * operation will happen asynchronously.
            + *
            + * <p>Only one commit operation may be pending at any time. If the committing takes longer than
            + * the frequency with which this method is called, then some commits may be skipped due to being
            + * superseded by newer ones.

              • End diff –

          Unnecessary empty space.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127636481 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java — @@ -293,12 +296,28 @@ public void shutdown() { @param offsetsToCommit The offsets to commit */ public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) { + setOffsetsToCommit(offsetsToCommit, null); + } + + /** + * Tells this thread to commit a set of offsets. This method does not block, the committing + * operation will happen asynchronously. + * + * <p>Only one commit operation may be pending at any time. If the committing takes longer than + * the frequency with which this method is called, then some commits may be skipped due to being + * superseded by newer ones. End diff – Unnecessary empty space.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127560857

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception

          { throw new Exception("The partitions were not set for the consumer"); }

          + // initialize commit metrics and default offset callback method
          + this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
          + this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
          +
          + this.offsetCommitCallback = new KafkaCommitCallback() {
          + @Override
          + public void onComplete(Exception exception) {
          + if (exception == null) {
          + successfulCommits.inc();
          — End diff –

          This is only invoked during checkpoint complete notification. There is no thread race condition.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127560857 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception { throw new Exception("The partitions were not set for the consumer"); } + // initialize commit metrics and default offset callback method + this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded"); + this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed"); + + this.offsetCommitCallback = new KafkaCommitCallback() { + @Override + public void onComplete(Exception exception) { + if (exception == null) { + successfulCommits.inc(); — End diff – This is only invoked during checkpoint complete notification. There is no thread race condition.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127560753

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java —
          @@ -0,0 +1,33 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka.internals;
          +
          +/**
          + * A callback interface that the user can implement to trigger custom actions when a commit request completes.
          — End diff –

          I updated the wording, this is only exposed to the kafka source operator (invoked upon checkpoint compete), not directly to users. Hence, there is no thread race condition either.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127560753 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java — @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * A callback interface that the user can implement to trigger custom actions when a commit request completes. — End diff – I updated the wording, this is only exposed to the kafka source operator (invoked upon checkpoint compete), not directly to users. Hence, there is no thread race condition either.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127400942

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java —
          @@ -0,0 +1,33 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka.internals;
          +
          +/**
          + * A callback interface that the user can implement to trigger custom actions when a commit request completes.
          — End diff –

          Is this callback actually exposed to users?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127400942 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java — @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * A callback interface that the user can implement to trigger custom actions when a commit request completes. — End diff – Is this callback actually exposed to users?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127400689

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception

          { throw new Exception("The partitions were not set for the consumer"); }

          + // initialize commit metrics and default offset callback method
          + this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
          + this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
          +
          + this.offsetCommitCallback = new KafkaCommitCallback() {
          + @Override
          + public void onComplete(Exception exception) {
          + if (exception == null) {
          + successfulCommits.inc();
          — End diff –

          If the callbacks are executed by another thread the result will be inaccurate, as the default counter implementation is not thread-safe.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127400689 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -505,6 +519,21 @@ public void run(SourceContext<T> sourceContext) throws Exception { throw new Exception("The partitions were not set for the consumer"); } + // initialize commit metrics and default offset callback method + this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded"); + this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed"); + + this.offsetCommitCallback = new KafkaCommitCallback() { + @Override + public void onComplete(Exception exception) { + if (exception == null) { + successfulCommits.inc(); — End diff – If the callbacks are executed by another thread the result will be inaccurate, as the default counter implementation is not thread-safe.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r127400527

          — Diff: docs/monitoring/metrics.md —
          @@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier:
          </tbody>
          </table>

          +#### Connector:
          +<table class="table table-bordered">
          + <thead>
          + <tr>
          + <th class="text-left" style="width: 20%">Scope</th>
          + <th class="text-left" style="width: 30%">Metrics</th>
          + <th class="text-left" style="width: 50%">Description</th>
          + </tr>
          + </thead>
          + <tbody>
          + <tr>
          + <th rowspan="1">Slot/Consumer</th>
          — End diff –

          This documentation is inconsistent with the rest.

          Scope should be "Operator", and you should add an additional "Infix" column which contains the names of the metric groups you are creating, concatenated with a period.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r127400527 — Diff: docs/monitoring/metrics.md — @@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier: </tbody> </table> +#### Connector: +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Scope</th> + <th class="text-left" style="width: 30%">Metrics</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="1">Slot/Consumer</th> — End diff – This documentation is inconsistent with the rest. Scope should be "Operator", and you should add an additional "Infix" column which contains the names of the metric groups you are creating, concatenated with a period.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on the issue:

          https://github.com/apache/flink/pull/4187

          @tzulitai seems the last CI pipeline failed because of stability issues, how can I trigger another build without making a commit?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on the issue: https://github.com/apache/flink/pull/4187 @tzulitai seems the last CI pipeline failed because of stability issues, how can I trigger another build without making a commit?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user GJL commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r126283335

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -184,6 +186,18 @@
          /** Flag indicating whether the consumer is still running. */
          private volatile boolean running = true;

          + // ------------------------------------------------------------------------
          — End diff –

          The indentation is off by 1 space.

          Show
          githubbot ASF GitHub Bot added a comment - Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r126283335 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -184,6 +186,18 @@ /** Flag indicating whether the consumer is still running. */ private volatile boolean running = true; + // ------------------------------------------------------------------------ — End diff – The indentation is off by 1 space.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user GJL commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r126283316

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -104,7 +106,7 @@
          // configuration state, set on the client relevant for all subtasks
          // ------------------------------------------------------------------------

          • /** Describes whether we are discovering partitions for fixed topics or a topic pattern. */
            + /** Descrnterbes whether we are discovering partitions for fixed topics or a topic pattern. */
              • End diff –

          This is probably an unintended change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r126283316 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -104,7 +106,7 @@ // configuration state, set on the client relevant for all subtasks // ------------------------------------------------------------------------ /** Describes whether we are discovering partitions for fixed topics or a topic pattern. */ + /** Descrnterbes whether we are discovering partitions for fixed topics or a topic pattern. */ End diff – This is probably an unintended change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on the issue:

          https://github.com/apache/flink/pull/4187

          @tzulitai rebase done.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on the issue: https://github.com/apache/flink/pull/4187 @tzulitai rebase done.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4187

          Hi @zhenzhongxu, could you rebase on to the latest master? Currently the PR contains commits unrelated to the change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4187 Hi @zhenzhongxu, could you rebase on to the latest master? Currently the PR contains commits unrelated to the change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4187

          Please follow the camel-case pattern that we use for other metrics.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4187 Please follow the camel-case pattern that we use for other metrics.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4187

          > I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity
          perform the monitoring (difference of committed offset vs log head), especially in failure situations.

          Yes, lets keep that apart from this PR. There is also a JIRA for exactly this feature: https://issues.apache.org/jira/browse/FLINK-6109

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4187 > I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity perform the monitoring (difference of committed offset vs log head), especially in failure situations. Yes, lets keep that apart from this PR. There is also a JIRA for exactly this feature: https://issues.apache.org/jira/browse/FLINK-6109
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4187

          I agree. "commits-succeeded" and "commits-failed" seems good!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4187 I agree. "commits-succeeded" and "commits-failed" seems good!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on the issue:

          https://github.com/apache/flink/pull/4187

          How about just "commits-succeeded" and "commits-failed" as metric names.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on the issue: https://github.com/apache/flink/pull/4187 How about just "commits-succeeded" and "commits-failed" as metric names.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4187

          I don't think it matters to much whether a metric is completely measured by Flink or just forwarded from kafka classes. Having "kafka" in the name also introduces an inherent redundancy, since the scope for, say the KafkaConsumerThread, already contains "KafkaConsumer".

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4187 I don't think it matters to much whether a metric is completely measured by Flink or just forwarded from kafka classes. Having "kafka" in the name also introduces an inherent redundancy, since the scope for, say the KafkaConsumerThread, already contains "KafkaConsumer".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on the issue:

          https://github.com/apache/flink/pull/4187

          @tzulitai

          *Regarding the metric naming:*
          Any suggestions on naming conventions for these flink specific metrics? How do you like 'kafkaconnector-commits-succeeded' (component-metric-name) as an example? I personally like hyphen seperators better than camel case for metric names.
          I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity
          perform the monitoring (difference of committed offset vs log head), especially in failure situations.

          *Regarding the implementation:*
          Thanks for the feedback. I'll explore the more proper implementation suggested, I'll get back to you with a solution or question.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on the issue: https://github.com/apache/flink/pull/4187 @tzulitai * Regarding the metric naming: * Any suggestions on naming conventions for these flink specific metrics? How do you like 'kafkaconnector-commits-succeeded' (component-metric-name) as an example? I personally like hyphen seperators better than camel case for metric names. I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity perform the monitoring (difference of committed offset vs log head), especially in failure situations. * Regarding the implementation: * Thanks for the feedback. I'll explore the more proper implementation suggested, I'll get back to you with a solution or question.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r124413756

          — Diff: docs/monitoring/metrics.md —
          @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
          </tbody>
          </table>

          +#### Connector:
          +<table class="table table-bordered">
          + <thead>
          + <tr>
          + <th class="text-left" style="width: 20%">Scope</th>
          + <th class="text-left" style="width: 30%">Metrics</th>
          + <th class="text-left" style="width: 50%">Description</th>
          + </tr>
          + </thead>
          + <tbody>
          + <tr>
          + <th rowspan="1">Slot/Consumer</th>
          + <td>kafkaCommitsSucceeded</td>
          + <td>Kafka offset commit success count if Kafka commit is turned on.</td>
          + </tr>
          + <tr>
          + <th rowspan="1">Slot/Consumer</th>
          + <td>kafkaCommitsFailed</td>
          + <td>Kafka offset commit failure count if Kafka commit is turned on.</td>
          — End diff –

          Sure, thanks for the feedback. Will update.a

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r124413756 — Diff: docs/monitoring/metrics.md — @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier: </tbody> </table> +#### Connector: +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Scope</th> + <th class="text-left" style="width: 30%">Metrics</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="1">Slot/Consumer</th> + <td>kafkaCommitsSucceeded</td> + <td>Kafka offset commit success count if Kafka commit is turned on.</td> + </tr> + <tr> + <th rowspan="1">Slot/Consumer</th> + <td>kafkaCommitsFailed</td> + <td>Kafka offset commit failure count if Kafka commit is turned on.</td> — End diff – Sure, thanks for the feedback. Will update.a
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhenzhongxu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r124413737

          — Diff: docs/monitoring/metrics.md —
          @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
          </tbody>
          </table>

          +#### Connector:
          +<table class="table table-bordered">
          + <thead>
          + <tr>
          + <th class="text-left" style="width: 20%">Scope</th>
          + <th class="text-left" style="width: 30%">Metrics</th>
          + <th class="text-left" style="width: 50%">Description</th>
          + </tr>
          + </thead>
          + <tbody>
          + <tr>
          + <th rowspan="1">Slot/Consumer</th>
          + <td>kafkaCommitsSucceeded</td>
          + <td>Kafka offset commit success count if Kafka commit is turned on.</td>
          — End diff –

          Sure, thanks for the feedback. Will update.a

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhenzhongxu commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r124413737 — Diff: docs/monitoring/metrics.md — @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier: </tbody> </table> +#### Connector: +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Scope</th> + <th class="text-left" style="width: 30%">Metrics</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="1">Slot/Consumer</th> + <td>kafkaCommitsSucceeded</td> + <td>Kafka offset commit success count if Kafka commit is turned on.</td> — End diff – Sure, thanks for the feedback. Will update.a
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r124284709

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java —
          @@ -119,6 +126,9 @@ public KafkaConsumerThread(

          this.nextOffsetsToCommit = new AtomicReference<>();
          this.running = true;
          +
          + this.successfulCommits = kafkaMetricGroup.counter("kafkaCommitsSucceeded");
          + this.failedCommits = kafkaMetricGroup.counter("kafkaCommitsFailed");
          — End diff –

          Please see my comment regarding the naming of the metrics.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r124284709 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java — @@ -119,6 +126,9 @@ public KafkaConsumerThread( this.nextOffsetsToCommit = new AtomicReference<>(); this.running = true; + + this.successfulCommits = kafkaMetricGroup.counter("kafkaCommitsSucceeded"); + this.failedCommits = kafkaMetricGroup.counter("kafkaCommitsFailed"); — End diff – Please see my comment regarding the naming of the metrics.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r124235539

          — Diff: docs/monitoring/metrics.md —
          @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
          </tbody>
          </table>

          +#### Connector:
          +<table class="table table-bordered">
          + <thead>
          + <tr>
          + <th class="text-left" style="width: 20%">Scope</th>
          + <th class="text-left" style="width: 30%">Metrics</th>
          + <th class="text-left" style="width: 50%">Description</th>
          + </tr>
          + </thead>
          + <tbody>
          + <tr>
          + <th rowspan="1">Slot/Consumer</th>
          + <td>kafkaCommitsSucceeded</td>
          + <td>Kafka offset commit success count if Kafka commit is turned on.</td>
          + </tr>
          + <tr>
          + <th rowspan="1">Slot/Consumer</th>
          + <td>kafkaCommitsFailed</td>
          + <td>Kafka offset commit failure count if Kafka commit is turned on.</td>
          — End diff –

          Same here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r124235539 — Diff: docs/monitoring/metrics.md — @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier: </tbody> </table> +#### Connector: +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Scope</th> + <th class="text-left" style="width: 30%">Metrics</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="1">Slot/Consumer</th> + <td>kafkaCommitsSucceeded</td> + <td>Kafka offset commit success count if Kafka commit is turned on.</td> + </tr> + <tr> + <th rowspan="1">Slot/Consumer</th> + <td>kafkaCommitsFailed</td> + <td>Kafka offset commit failure count if Kafka commit is turned on.</td> — End diff – Same here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4187#discussion_r124235516

          — Diff: docs/monitoring/metrics.md —
          @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier:
          </tbody>
          </table>

          +#### Connector:
          +<table class="table table-bordered">
          + <thead>
          + <tr>
          + <th class="text-left" style="width: 20%">Scope</th>
          + <th class="text-left" style="width: 30%">Metrics</th>
          + <th class="text-left" style="width: 50%">Description</th>
          + </tr>
          + </thead>
          + <tbody>
          + <tr>
          + <th rowspan="1">Slot/Consumer</th>
          + <td>kafkaCommitsSucceeded</td>
          + <td>Kafka offset commit success count if Kafka commit is turned on.</td>
          — End diff –

          I would make this statement more strict, in that the metric only exists if commit is turned on AND checkpointing is enabled. The added metrics would not appear if only Kafka's periodic offset committing is turned on.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r124235516 — Diff: docs/monitoring/metrics.md — @@ -867,6 +867,28 @@ Thus, in order to infer the metric identifier: </tbody> </table> +#### Connector: +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Scope</th> + <th class="text-left" style="width: 30%">Metrics</th> + <th class="text-left" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="1">Slot/Consumer</th> + <td>kafkaCommitsSucceeded</td> + <td>Kafka offset commit success count if Kafka commit is turned on.</td> — End diff – I would make this statement more strict, in that the metric only exists if commit is turned on AND checkpointing is enabled. The added metrics would not appear if only Kafka's periodic offset committing is turned on.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4187

          Hi @zhenzhongxu, thanks for the PR. I like the idea of exposing these metrics.

          Regarding the metric naming:
          I think we also expose metrics that are directly available through the Kafka client.
          Apart from this PR, there was also previous discussion about adding other Flink-specific metrics to the connector (e.g., offset lag since last checkpoint).
          I wonder whether or not we should regulate the naming scheme for these Flink-specific metrics, to set them apart from Kafka's provided metrics. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4187 Hi @zhenzhongxu, thanks for the PR. I like the idea of exposing these metrics. Regarding the metric naming: I think we also expose metrics that are directly available through the Kafka client. Apart from this PR, there was also previous discussion about adding other Flink-specific metrics to the connector (e.g., offset lag since last checkpoint). I wonder whether or not we should regulate the naming scheme for these Flink-specific metrics, to set them apart from Kafka's provided metrics. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4187

          Please resolve the checkstyle violations:
          ```

          [ERROR] src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java:[96] (javadoc) JavadocStyle: First sentence should end with a period.
          [ERROR] src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java:[99] (javadoc) JavadocStyle: First sentence should end with a period.
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4187 Please resolve the checkstyle violations: ``` [ERROR] src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java: [96] (javadoc) JavadocStyle: First sentence should end with a period. [ERROR] src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java: [99] (javadoc) JavadocStyle: First sentence should end with a period. ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zhenzhongxu opened a pull request:

          https://github.com/apache/flink/pull/4187

          FLINK-6998[Kafka Connector] Add kafka offset commit metrics in cons…

          add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in KafkaConsumerThread class.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zhenzhongxu/flink FLINK-6998

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4187.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4187


          commit 7d3f0af6511d732c3cc2cb5231d76f9e8a12e684
          Author: Zhenzhong Xu <zxu@netflix.com>
          Date: 2017-06-26T22:51:09Z

          FLINK-6998[Kafka Connector] Add kafka offset commit metrics in consumer callback


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zhenzhongxu opened a pull request: https://github.com/apache/flink/pull/4187 FLINK-6998 [Kafka Connector] Add kafka offset commit metrics in cons… add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in KafkaConsumerThread class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhenzhongxu/flink FLINK-6998 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4187.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4187 commit 7d3f0af6511d732c3cc2cb5231d76f9e8a12e684 Author: Zhenzhong Xu <zxu@netflix.com> Date: 2017-06-26T22:51:09Z FLINK-6998 [Kafka Connector] Add kafka offset commit metrics in consumer callback

            People

            • Assignee:
              zhenzhongxu Zhenzhong Xu
              Reporter:
              zhenzhongxu Zhenzhong Xu
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development