Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7287

test instability in Kafka010ITCase.testCommitOffsetsToKafka

    Details

      Description

      sporadically, Kafka010ITCase.testCommitOffsetsToKafka seems to be failing, e.g.

      ================================================================================
      Test testCommitOffsetsToKafka(org.apache.flink.streaming.connectors.kafka.Kafka010ITCase) is running.
      --------------------------------------------------------------------------------
      12:29:31,597 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase     - 
      ===================================
      == Writing sequence of 50 into testCommitOffsetsToKafkaTopic with p=3
      ===================================
      12:29:31,597 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase     - Writing attempt #1
      12:29:31,598 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - Creating topic testCommitOffsetsToKafkaTopic-1
      12:29:31,598 INFO  org.I0Itec.zkclient.ZkEventThread                             - Starting ZkClient event thread.
      12:29:31,599 INFO  org.I0Itec.zkclient.ZkClient                                  - Waiting for keeper state SyncConnected
      12:29:31,601 INFO  org.I0Itec.zkclient.ZkClient                                  - zookeeper state changed (SyncConnected)
      12:29:31,615 INFO  org.I0Itec.zkclient.ZkEventThread                             - Terminate ZkClient event thread.
      12:29:31,719 INFO  org.I0Itec.zkclient.ZkEventThread                             - Starting ZkClient event thread.
      12:29:31,722 INFO  org.I0Itec.zkclient.ZkClient                                  - Waiting for keeper state SyncConnected
      12:29:31,728 INFO  org.I0Itec.zkclient.ZkClient                                  - zookeeper state changed (SyncConnected)
      12:29:31,729 INFO  org.I0Itec.zkclient.ZkEventThread                             - Terminate ZkClient event thread.
      12:29:31,832 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Starting FlinkKafkaProducer (3/3) to produce into default topic testCommitOffsetsToKafkaTopic-1
      12:29:31,840 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Starting FlinkKafkaProducer (2/3) to produce into default topic testCommitOffsetsToKafkaTopic-1
      12:29:31,842 WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.
      12:29:31,844 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Starting FlinkKafkaProducer (1/3) to produce into default topic testCommitOffsetsToKafkaTopic-1
      12:29:31,844 WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.
      12:29:31,846 WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.
      12:29:31,998 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase     - Finished writing sequence
      12:29:31,998 INFO  org.apache.flink.streaming.connectors.kafka.KafkaTestBase     - Validating sequence
      12:29:32,123 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
      12:29:32,129 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
      12:29:32,136 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
      12:29:32,139 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=1}]
      12:29:32,154 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 2 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=0}]
      12:29:32,236 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 1 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=2}]
      12:29:32,496 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
      12:29:32,507 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
      12:29:32,521 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No restore state for FlinkKafkaConsumer.
      12:29:32,531 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 1 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=2}]
      12:29:32,535 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 2 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=0}]
      12:29:32,628 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=1}]
      12:29:33,017 ERROR org.apache.flink.streaming.connectors.kafka.Kafka010ITCase    - 
      --------------------------------------------------------------------------------
      Test testCommitOffsetsToKafka(org.apache.flink.streaming.connectors.kafka.Kafka010ITCase) failed with:
      java.lang.RuntimeException: Job failed with an exception
      	at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runCommitOffsetsToKafka(KafkaConsumerTestBase.java:251)
      	at org.apache.flink.streaming.connectors.kafka.Kafka010ITCase.testCommitOffsetsToKafka(Kafka010ITCase.java:162)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
      	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was cancelled.
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:921)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      

      https://s3.amazonaws.com/archive.travis-ci.org/jobs/258046274/log.txt?X-Amz-Expires=30&X-Amz-Date=20170727T155552Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170727/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=0b629a0c2b5daedc65c8aa8eb3293bc956f8fd61dc70066a051ed5878b004dbf

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user NicoK opened a pull request:

          https://github.com/apache/flink/pull/4414

          FLINK-7287[tests] fix test instabilities in KafkaConsumerTestBase

            1. What is the purpose of the change

          fix test instabilities in KafkaConsumerTestBase

            1. Brief change log
          • Properly ignore the `JobCancellationException` (several tests should have always failed but did not due to not waiting for the job execution thread to finish).
          • Always wait for the job execution thread to finish before checking the result of and returning from the test (although this is not strictly necessary for the tests, it may uncover any hidden failures between the cancel command and the actual cancellation but it also allows following tests to work with a clean cluster and no interfering jobs).
            1. Verifying this change

          This change is already covered by existing tests extending `KafkaConsumerTestBase`, such as `Kafka010ITCase`.

            1. Does this pull request potentially affect one of the following parts:
          • Dependencies (does it add or upgrade a dependency): (no)
          • The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
          • The serializers: (no)
          • The runtime per-record code paths (performance sensitive): (no)
          • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
            1. Documentation
          • Does this pull request introduce a new feature? (no)
          • If yes, how is the feature documented? (not applicable)

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/NicoK/flink flink-7287

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4414.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4414


          commit 31f672171bd44f86c4bd31bd383077141840e2a6
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-07-27T16:31:13Z

          FLINK-7287[tests] fix checks ignoring a JobCancellationException

          commit db4ec9f1237e704b761404b004a25df8f924e546
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-07-27T16:32:14Z

          FLINK-7287[tests] fix main test threads not waiting for the job execution thread to finish


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4414 FLINK-7287 [tests] fix test instabilities in KafkaConsumerTestBase What is the purpose of the change fix test instabilities in KafkaConsumerTestBase Brief change log Properly ignore the `JobCancellationException` (several tests should have always failed but did not due to not waiting for the job execution thread to finish). Always wait for the job execution thread to finish before checking the result of and returning from the test (although this is not strictly necessary for the tests, it may uncover any hidden failures between the cancel command and the actual cancellation but it also allows following tests to work with a clean cluster and no interfering jobs). Verifying this change This change is already covered by existing tests extending `KafkaConsumerTestBase`, such as `Kafka010ITCase`. Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (no) The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) The serializers: (no) The runtime per-record code paths (performance sensitive): (no) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) Documentation Does this pull request introduce a new feature? (no) If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7287 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4414.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4414 commit 31f672171bd44f86c4bd31bd383077141840e2a6 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-07-27T16:31:13Z FLINK-7287 [tests] fix checks ignoring a JobCancellationException commit db4ec9f1237e704b761404b004a25df8f924e546 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-07-27T16:32:14Z FLINK-7287 [tests] fix main test threads not waiting for the job execution thread to finish
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4414

          Thanks for the fix and your notice on the instabilities @NicoK!

          Could you check if my understanding of the issue is correct?:

          • `Kafka010ITCase.testCommitOffsetsToKafka` was failing on `JobCancellationException`, whereas that exception signals success and should be ignored.
          • We were "incorrectly passing" before only because we didn't properly wait for the job to be cancelled.
          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4414 Thanks for the fix and your notice on the instabilities @NicoK! Could you check if my understanding of the issue is correct?: `Kafka010ITCase.testCommitOffsetsToKafka` was failing on `JobCancellationException`, whereas that exception signals success and should be ignored. We were "incorrectly passing" before only because we didn't properly wait for the job to be cancelled.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/4414

          Yes, both your observations are correct. The pattern in most tests inside `KafkaConsumerTestBase` is to
          1) start a job,
          2) do some processing, e.g. until some offset or another finish condition, including some checks
          3) cancel the job
          4) check for exceptions

          Since cancelling the job issues a `JobCancellationException`, this must be ignored.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4414 Yes, both your observations are correct. The pattern in most tests inside `KafkaConsumerTestBase` is to 1) start a job, 2) do some processing, e.g. until some offset or another finish condition, including some checks 3) cancel the job 4) check for exceptions Since cancelling the job issues a `JobCancellationException`, this must be ignored.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4414

          Thanks for clarifying!
          +1 to merge this to `master` and `release-1.3`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4414 Thanks for clarifying! +1 to merge this to `master` and `release-1.3`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4414

          Merging ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4414 Merging ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4414

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4414
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Thanks for the contribution Nico.
          Fixed for master via f0d4a772f44e95b52d58284b85074cb04d2796ab.
          Fixed for release-1.3 via 452f5d1032f61a29726dd484453a256c7d57d052.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Thanks for the contribution Nico. Fixed for master via f0d4a772f44e95b52d58284b85074cb04d2796ab. Fixed for release-1.3 via 452f5d1032f61a29726dd484453a256c7d57d052.

            People

            • Assignee:
              NicoK Nico Kruber
              Reporter:
              NicoK Nico Kruber
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development