Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4616

Kafka consumer doesn't store last emmited watermarks per partition in state

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Won't Fix
    • Affects Version/s: 1.1.1
    • Fix Version/s: None
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      Kafka consumers stores in state only kafka offsets and doesn't store last emmited watermarks, this may go to wrong state when checkpoint is restored:

      Let's say our watermark is (timestamp - 10) and in case we have the following messages queue results will be different after checkpoint restore and during normal processing:

      A(ts = 30)
      B(ts = 35)
      ------ checkpoint goes here
      C(ts=15) – this one should be filtered by next time window
      D(ts=60)

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          +1, I think this would be good to have.

          Show
          aljoscha Aljoscha Krettek added a comment - +1, I think this would be good to have.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user MayerRoman opened a pull request:

          https://github.com/apache/flink/pull/3031

          FLINK-4616 Added functionality through which watermarks for each pa…

          …rtition are saved and loaded via checkpointing mechanism

          FLINK-4616 Kafka consumer doesn't store last emmited watermarks per partition in state.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/MayerRoman/flink FLINK_4616

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3031.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3031


          commit bb9a54903dd445aa4d0750b1a0d6d1d592ab891f
          Author: Roman Maier <roman_maier@epam.com>
          Date: 2016-12-20T07:28:12Z

          FLINK-4616 Added functionality through which watermarks for each partition are saved and loaded via checkpointing mechanism


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user MayerRoman opened a pull request: https://github.com/apache/flink/pull/3031 FLINK-4616 Added functionality through which watermarks for each pa… …rtition are saved and loaded via checkpointing mechanism FLINK-4616 Kafka consumer doesn't store last emmited watermarks per partition in state. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MayerRoman/flink FLINK_4616 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3031.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3031 commit bb9a54903dd445aa4d0750b1a0d6d1d592ab891f Author: Roman Maier <roman_maier@epam.com> Date: 2016-12-20T07:28:12Z FLINK-4616 Added functionality through which watermarks for each partition are saved and loaded via checkpointing mechanism
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MayerRoman commented on the issue:

          https://github.com/apache/flink/pull/3031

          I think that the changes that I propose eliminates the possibility of starting with checkpoints created before my code changes.

          Because now it saves ListState<Tuple2<KafkaTopicPartition, Tuple2<Long, Long>>> (partition + offset + watermark).
          And before it saved ListState<Tuple2<KafkaTopicPartition, Long>> (partition + offset).

          (I mean checkpoints version later then 1.1.
          Recently Added backward compatibility with 1.1 snapshots is taken into account in my commit with it, I think everything is ok)

          Please advise me how to repair backward compatibility.

          I have some idea of how to implement it:

          1) somehow verify returned from stateStore.getSerializableListState(..) object
          in initializeState method
          https://github.com/apache/flink/pull/3031/files?diff=unified#diff-06bf4a7f73d98ef91309154654563475R321

          is it
          ListState<Tuple2<KafkaTopicPartition, Long>>
          or
          ListState<Tuple2<KafkaTopicPartition, Tuple<Long, Long>>>

          2) Use for saving watermark separate state-object.

          Or it is necessary implement different way.

          I would be grateful for help.

          Show
          githubbot ASF GitHub Bot added a comment - Github user MayerRoman commented on the issue: https://github.com/apache/flink/pull/3031 I think that the changes that I propose eliminates the possibility of starting with checkpoints created before my code changes. Because now it saves ListState<Tuple2<KafkaTopicPartition, Tuple2<Long, Long>>> (partition + offset + watermark). And before it saved ListState<Tuple2<KafkaTopicPartition, Long>> (partition + offset). (I mean checkpoints version later then 1.1. Recently Added backward compatibility with 1.1 snapshots is taken into account in my commit with it, I think everything is ok) Please advise me how to repair backward compatibility. I have some idea of how to implement it: 1) somehow verify returned from stateStore.getSerializableListState(..) object in initializeState method https://github.com/apache/flink/pull/3031/files?diff=unified#diff-06bf4a7f73d98ef91309154654563475R321 is it ListState<Tuple2<KafkaTopicPartition, Long>> or ListState<Tuple2<KafkaTopicPartition, Tuple<Long, Long>>> 2) Use for saving watermark separate state-object. Or it is necessary implement different way. I would be grateful for help.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3031

          Thank you for the contribution @MayerRoman. Just want to let you know that I've noticed this PR, and I think the issue is definitely something we want to fix. I'll allocate some time this week to review the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3031 Thank you for the contribution @MayerRoman. Just want to let you know that I've noticed this PR, and I think the issue is definitely something we want to fix. I'll allocate some time this week to review the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MayerRoman commented on the issue:

          https://github.com/apache/flink/pull/3031

          Thank you Tzu-Li Tai.

          Show
          githubbot ASF GitHub Bot added a comment - Github user MayerRoman commented on the issue: https://github.com/apache/flink/pull/3031 Thank you Tzu-Li Tai.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3031#discussion_r97935720

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
          @@ -175,34 +176,115 @@ protected AbstractFetcher(
          // ------------------------------------------------------------------------

          /**

          • * Takes a snapshot of the partition offsets.
            + * Takes a snapshot of the partition offsets and watermarks.
          • <p>Important: This method mus be called under the checkpoint lock.
          • * @return A map from partition to current offset.
            + * @return A map from partition to current offset and watermark.
            */
          • public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
            + public HashMap<KafkaTopicPartition, Tuple2<Long, Long>> snapshotCurrentState() {
            // this method assumes that the checkpoint lock is held
            assert Thread.holdsLock(checkpointLock);
          • HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
          • for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
          • state.put(partition.getKafkaTopicPartition(), partition.getOffset());
            + HashMap<KafkaTopicPartition, Tuple2<Long, Long>> state = new HashMap<>(allPartitions.length);
            +
            + switch (timestampWatermarkMode) {
            +
            + case NO_TIMESTAMPS_WATERMARKS:
            Unknown macro: { + + for (KafkaTopicPartitionState<KPH> partition }

            +
            + case PERIODIC_WATERMARKS:

            Unknown macro: { + KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partition }

            +
            + case PUNCTUATED_WATERMARKS:

            Unknown macro: { + KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partition }

            +
            + default:
            + // cannot happen, add this as a guard for the future
            + throw new RuntimeException();
            }

          • return state;
            }

          /**

          • * Restores the partition offsets.
            + * Restores the partition offsets and watermarks.
          • * @param snapshotState The offsets for the partitions
            + * @param snapshotState The offsets and watermarks for the partitions
            */
          • public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) {
          • for (KafkaTopicPartitionState<?> partition : allPartitions) {
          • Long offset = snapshotState.get(partition.getKafkaTopicPartition());
          • if (offset != null) {
          • partition.setOffset(offset);
            + public void restoreOffsetsAndWatermarks(Map<KafkaTopicPartition, Tuple2<Long, Long>> snapshotState) {
            +
            + switch (timestampWatermarkMode) {
            +
            + case NO_TIMESTAMPS_WATERMARKS: {
            + for (KafkaTopicPartitionState<KPH> partition : allPartitions)
            Unknown macro: { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { + partition.setOffset(offset); + }
            + }
            + break;
            + }
            +
            + case PERIODIC_WATERMARKS: {
            + KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> [] partitions =
            + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> []) allPartitions;
            +
            + for (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partition : partitions) {
            + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0;
            + if (offset != null) { + partition.setOffset(offset); + } + + Long watermarkTimestamp = snapshotState.get(partition.getKafkaTopicPartition()).f1; + if (watermarkTimestamp != null) { + partition.setCurrentWatermarkTimestamp(watermarkTimestamp); + }
            + }
            + break;
            }
            +
            + case PUNCTUATED_WATERMARKS: {
            + KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> [] partitions =
            + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> []) allPartitions;
            +
            + for (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partition : partitions) {
            + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0;
            + if (offset != null) { + partition.setOffset(offset); + }
            +
            + Long watermarkTimestamp = snapshotState.get(partition.getKafkaTopicPartition()).f1;
            + if (watermarkTimestamp != null) { + partition.setCurrentWatermarkTimestamp(watermarkTimestamp); + } + }

            + break;
            + }
            +
            + default:
            + // cannot happen, add this as a guard for the future
            + throw new RuntimeException();

              • End diff –

          Would be good to have a reason message here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935720 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -175,34 +176,115 @@ protected AbstractFetcher( // ------------------------------------------------------------------------ /** * Takes a snapshot of the partition offsets. + * Takes a snapshot of the partition offsets and watermarks. <p>Important: This method mus be called under the checkpoint lock. * @return A map from partition to current offset. + * @return A map from partition to current offset and watermark. */ public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() { + public HashMap<KafkaTopicPartition, Tuple2<Long, Long>> snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length); for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) { state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap<KafkaTopicPartition, Tuple2<Long, Long>> state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: Unknown macro: { + + for (KafkaTopicPartitionState<KPH> partition } + + case PERIODIC_WATERMARKS: Unknown macro: { + KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partition } + + case PUNCTUATED_WATERMARKS: Unknown macro: { + KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partition } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); } return state; } /** * Restores the partition offsets. + * Restores the partition offsets and watermarks. * @param snapshotState The offsets for the partitions + * @param snapshotState The offsets and watermarks for the partitions */ public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) { for (KafkaTopicPartitionState<?> partition : allPartitions) { Long offset = snapshotState.get(partition.getKafkaTopicPartition()); if (offset != null) { partition.setOffset(offset); + public void restoreOffsetsAndWatermarks(Map<KafkaTopicPartition, Tuple2<Long, Long>> snapshotState) { + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + for (KafkaTopicPartitionState<KPH> partition : allPartitions) Unknown macro: { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { + partition.setOffset(offset); + } + } + break; + } + + case PERIODIC_WATERMARKS: { + KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partition : partitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { + partition.setOffset(offset); + } + + Long watermarkTimestamp = snapshotState.get(partition.getKafkaTopicPartition()).f1; + if (watermarkTimestamp != null) { + partition.setCurrentWatermarkTimestamp(watermarkTimestamp); + } + } + break; } + + case PUNCTUATED_WATERMARKS: { + KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partition : partitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()).f0; + if (offset != null) { + partition.setOffset(offset); + } + + Long watermarkTimestamp = snapshotState.get(partition.getKafkaTopicPartition()).f1; + if (watermarkTimestamp != null) { + partition.setCurrentWatermarkTimestamp(watermarkTimestamp); + } + } + break; + } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); End diff – Would be good to have a reason message here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3031#discussion_r97935696

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
          @@ -175,34 +176,115 @@ protected AbstractFetcher(
          // ------------------------------------------------------------------------

          /**

          • * Takes a snapshot of the partition offsets.
            + * Takes a snapshot of the partition offsets and watermarks.
          • <p>Important: This method mus be called under the checkpoint lock.
          • * @return A map from partition to current offset.
            + * @return A map from partition to current offset and watermark.
            */
          • public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
            + public HashMap<KafkaTopicPartition, Tuple2<Long, Long>> snapshotCurrentState() {
            // this method assumes that the checkpoint lock is held
            assert Thread.holdsLock(checkpointLock);
          • HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
          • for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
          • state.put(partition.getKafkaTopicPartition(), partition.getOffset());
            + HashMap<KafkaTopicPartition, Tuple2<Long, Long>> state = new HashMap<>(allPartitions.length);
            +
            + switch (timestampWatermarkMode) {
            +
            + case NO_TIMESTAMPS_WATERMARKS:
            Unknown macro: { + + for (KafkaTopicPartitionState<KPH> partition }

            +
            + case PERIODIC_WATERMARKS:

            Unknown macro: { + KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partition }

            +
            + case PUNCTUATED_WATERMARKS:

            Unknown macro: { + KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partition }

            +
            + default:
            + // cannot happen, add this as a guard for the future
            + throw new RuntimeException();

              • End diff –

          Would be good to have a reason message here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935696 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -175,34 +176,115 @@ protected AbstractFetcher( // ------------------------------------------------------------------------ /** * Takes a snapshot of the partition offsets. + * Takes a snapshot of the partition offsets and watermarks. <p>Important: This method mus be called under the checkpoint lock. * @return A map from partition to current offset. + * @return A map from partition to current offset and watermark. */ public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() { + public HashMap<KafkaTopicPartition, Tuple2<Long, Long>> snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length); for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) { state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap<KafkaTopicPartition, Tuple2<Long, Long>> state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: Unknown macro: { + + for (KafkaTopicPartitionState<KPH> partition } + + case PERIODIC_WATERMARKS: Unknown macro: { + KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> partition } + + case PUNCTUATED_WATERMARKS: Unknown macro: { + KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> [] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> []) allPartitions; + + for (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> partition } + + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); End diff – Would be good to have a reason message here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3031#discussion_r97757847

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -101,7 +101,7 @@

          • The assigner is kept in serialized form, to deserialize it into multiple copies */
            private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
          • private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
            + private transient ListState<Tuple2<KafkaTopicPartition, Tuple2<Long, Long>>> offsetsAndWatermarksStateForCheckpoint;
              • End diff –

          I think we should switch to have a specific checkpointed state object instead of continuing to "extend" the original Tuple. This will also be helpful for compatibility for any future changes to the checkpointed state.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97757847 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -101,7 +101,7 @@ The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner; private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint; + private transient ListState<Tuple2<KafkaTopicPartition, Tuple2<Long, Long>>> offsetsAndWatermarksStateForCheckpoint; End diff – I think we should switch to have a specific checkpointed state object instead of continuing to "extend" the original Tuple. This will also be helpful for compatibility for any future changes to the checkpointed state.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3031#discussion_r97935636

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
          @@ -175,34 +176,115 @@ protected AbstractFetcher(
          // ------------------------------------------------------------------------

          /**

          • * Takes a snapshot of the partition offsets.
            + * Takes a snapshot of the partition offsets and watermarks.
          • <p>Important: This method mus be called under the checkpoint lock.
          • * @return A map from partition to current offset.
            + * @return A map from partition to current offset and watermark.
            */
          • public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
            + public HashMap<KafkaTopicPartition, Tuple2<Long, Long>> snapshotCurrentState() {
            // this method assumes that the checkpoint lock is held
            assert Thread.holdsLock(checkpointLock);
          • HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
          • for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
          • state.put(partition.getKafkaTopicPartition(), partition.getOffset());
            + HashMap<KafkaTopicPartition, Tuple2<Long, Long>> state = new HashMap<>(allPartitions.length);
            +
            + switch (timestampWatermarkMode) {
            +
            + case NO_TIMESTAMPS_WATERMARKS: {
            +
            + for (KafkaTopicPartitionState<KPH> partition : allPartitions) {
              • End diff –

          Excessive empty line above this line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935636 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -175,34 +176,115 @@ protected AbstractFetcher( // ------------------------------------------------------------------------ /** * Takes a snapshot of the partition offsets. + * Takes a snapshot of the partition offsets and watermarks. <p>Important: This method mus be called under the checkpoint lock. * @return A map from partition to current offset. + * @return A map from partition to current offset and watermark. */ public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() { + public HashMap<KafkaTopicPartition, Tuple2<Long, Long>> snapshotCurrentState() { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length); for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) { state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + HashMap<KafkaTopicPartition, Tuple2<Long, Long>> state = new HashMap<>(allPartitions.length); + + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + + for (KafkaTopicPartitionState<KPH> partition : allPartitions) { End diff – Excessive empty line above this line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3031#discussion_r97758477

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception

          { LOG.debug("snapshotState() called on closed source"); }

          else {

          • offsetsStateForCheckpoint.clear();
            + offsetsAndWatermarksStateForCheckpoint.clear();

          final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
          if (fetcher == null) {
          // the fetcher has not yet been initialized, which means we need to return the

          • // originally restored offsets or the assigned partitions
            + // originally restored offsets and watermarks or the assigned partitions
          • if (restoreToOffset != null) {
            + if (restoreToOffsetAndWatermark != null) {
          • for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
          • offsetsStateForCheckpoint.add(
          • Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
            + for (Map.Entry<KafkaTopicPartition, Tuple2<Long, Long>> kafkaTopicPartitionOffsetAndWatermark : restoreToOffsetAndWatermark.entrySet()) {
            + offsetsAndWatermarksStateForCheckpoint.add(
            + Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue()));
              • End diff –

          Having a specific checkpoint state object will also be helpful for code readability in situations like this one (it's quite tricky to understand quickly what the key / value refers to, as well as some of the `f0`, `f1` calls in other parts of the PR. I know the previous code used `f0` and `f1` also, but I think it's a good opportunity to improve that).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97758477 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { LOG.debug("snapshotState() called on closed source"); } else { offsetsStateForCheckpoint.clear(); + offsetsAndWatermarksStateForCheckpoint.clear(); final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; if (fetcher == null) { // the fetcher has not yet been initialized, which means we need to return the // originally restored offsets or the assigned partitions + // originally restored offsets and watermarks or the assigned partitions if (restoreToOffset != null) { + if (restoreToOffsetAndWatermark != null) { for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { offsetsStateForCheckpoint.add( Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); + for (Map.Entry<KafkaTopicPartition, Tuple2<Long, Long>> kafkaTopicPartitionOffsetAndWatermark : restoreToOffsetAndWatermark.entrySet()) { + offsetsAndWatermarksStateForCheckpoint.add( + Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), kafkaTopicPartitionOffsetAndWatermark.getValue())); End diff – Having a specific checkpoint state object will also be helpful for code readability in situations like this one (it's quite tricky to understand quickly what the key / value refers to, as well as some of the `f0`, `f1` calls in other parts of the PR. I know the previous code used `f0` and `f1` also, but I think it's a good opportunity to improve that).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3031#discussion_r97935971

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java —
          @@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp()

          { return partitionWatermark; }

          + void setCurrentWatermarkTimestamp(long watermarkTimestamp) {
          — End diff –

          The other methods seem to be `public` (although they can actually be package-private). Should we stay consistent with that here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3031#discussion_r97935971 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java — @@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp() { return partitionWatermark; } + void setCurrentWatermarkTimestamp(long watermarkTimestamp) { — End diff – The other methods seem to be `public` (although they can actually be package-private). Should we stay consistent with that here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3031

          A re-clarification about backwards compatibility for state type change:
          Currently, one limitation for compatible applications across savepoint restore is that you can't change the type of state otherwise state restore will fail, therefore not compatible. The only work around, is to have another field as the new state with the new type, and somehow try to "encode" / "decode" the watermark state into / from the original `Tuple2<KafkaTopicPartition, Long>`. I don't think this is easily possible ...

          At the same time, there was recent discussion about letting the window operators also checkpoint watermarks. So perhaps we might not need to let the Kafka sources checkpoint watermarks in the end, if the window operators already take care of restoring the previous event time.
          What I'm curious about right now is whether or not in the future, redistributions of Kafka partition states across source subtasks will work well with the checkpointed watermarks in the downstream window operators. I don't think there should be a problem.

          @aljoscha can you perhaps help clarify this?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3031 A re-clarification about backwards compatibility for state type change: Currently, one limitation for compatible applications across savepoint restore is that you can't change the type of state otherwise state restore will fail, therefore not compatible. The only work around, is to have another field as the new state with the new type, and somehow try to "encode" / "decode" the watermark state into / from the original `Tuple2<KafkaTopicPartition, Long>`. I don't think this is easily possible ... At the same time, there was recent discussion about letting the window operators also checkpoint watermarks. So perhaps we might not need to let the Kafka sources checkpoint watermarks in the end, if the window operators already take care of restoring the previous event time. What I'm curious about right now is whether or not in the future, redistributions of Kafka partition states across source subtasks will work well with the checkpointed watermarks in the downstream window operators. I don't think there should be a problem. @aljoscha can you perhaps help clarify this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3031

          I think this PR should also include a test for the added feature.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3031 I think this PR should also include a test for the added feature.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MayerRoman commented on the issue:

          https://github.com/apache/flink/pull/3031

          I hope that I ended up with another issue, and I come back to this.

          First, I want to ask a question that perhaps remove all the others.

          Tzu-Li Tai, did I understand correctly that if discussion about letting the window operators checkpoint watermarks lead to the decision to implement this functionality in the window operators, the need to preserve the state of watermarks in Kafka consumer will disappear?

          Show
          githubbot ASF GitHub Bot added a comment - Github user MayerRoman commented on the issue: https://github.com/apache/flink/pull/3031 I hope that I ended up with another issue, and I come back to this. First, I want to ask a question that perhaps remove all the others. Tzu-Li Tai, did I understand correctly that if discussion about letting the window operators checkpoint watermarks lead to the decision to implement this functionality in the window operators, the need to preserve the state of watermarks in Kafka consumer will disappear?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3031

          Hi @MayerRoman! Thank you for coming back to this issue.

          I had a quick chat offline with @aljoscha about whether or not it'll be reasonable to add this. Either your approach in this PR or letting window operators checkpoint watermarks will both solve the issue of late elements after restore. We thought that we should let the user function code be free of responsibility of checkpointing watermarks, and let user code simply leave that to the streaming internals (checkpointed by window operators, and perhaps also by source operators).

          So, essentially, the Kafka consumer should not need to checkpoint watermarks, and we can close this PR and the JIRA ticket. Very sorry for the late discussion on this, and having you worked on it already.

          Let we know what you think and whether or not you agree

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3031 Hi @MayerRoman! Thank you for coming back to this issue. I had a quick chat offline with @aljoscha about whether or not it'll be reasonable to add this. Either your approach in this PR or letting window operators checkpoint watermarks will both solve the issue of late elements after restore. We thought that we should let the user function code be free of responsibility of checkpointing watermarks, and let user code simply leave that to the streaming internals (checkpointed by window operators, and perhaps also by source operators). So, essentially, the Kafka consumer should not need to checkpoint watermarks, and we can close this PR and the JIRA ticket. Very sorry for the late discussion on this, and having you worked on it already. Let we know what you think and whether or not you agree
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MayerRoman commented on the issue:

          https://github.com/apache/flink/pull/3031

          Hello, Tzu-Li Tai!
          I think you make a good decision and agree to the fact that the PR and JIRA ticket can be closed.

          Do not worry about the done work, I got good experience in the process.

          Show
          githubbot ASF GitHub Bot added a comment - Github user MayerRoman commented on the issue: https://github.com/apache/flink/pull/3031 Hello, Tzu-Li Tai! I think you make a good decision and agree to the fact that the PR and JIRA ticket can be closed. Do not worry about the done work, I got good experience in the process.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3031

          Thanks. Can you please close this PR then :-D ? I'll close the JIRA.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3031 Thanks. Can you please close this PR then :-D ? I'll close the JIRA.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          We decided that we should let the user function code be free of responsibility of checkpointing watermarks, and let user code simply leave that to the streaming internals (checkpointed by window operators, and perhaps also by source operators). Therefore, this would not be need in the Kafka consumer.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - We decided that we should let the user function code be free of responsibility of checkpointing watermarks, and let user code simply leave that to the streaming internals (checkpointed by window operators, and perhaps also by source operators). Therefore, this would not be need in the Kafka consumer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MayerRoman commented on the issue:

          https://github.com/apache/flink/pull/3031

          Ok

          Show
          githubbot ASF GitHub Bot added a comment - Github user MayerRoman commented on the issue: https://github.com/apache/flink/pull/3031 Ok
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user MayerRoman closed the pull request at:

          https://github.com/apache/flink/pull/3031

          Show
          githubbot ASF GitHub Bot added a comment - Github user MayerRoman closed the pull request at: https://github.com/apache/flink/pull/3031

            People

            • Assignee:
              roman_maier Roman Maier
              Reporter:
              ymakhno Yuri Makhno
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development