Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21028

Streaming application didn't stop properly

Agile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      I have a Flink job running on YARN with a disjoint graph, i.e. a single job contains two independent and isolated pipelines.

      From time to time, I stop the job with a savepoint like so:

      flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS --yarnapplicationId=${FLINK_YARN_APPID} ${ID}

      A few days ago, this job suddenly didn't stop properly as usual but ran into a possible race condition.

      On the CLI with stop, I received a simple timeout:

      org.apache.flink.util.FlinkException: Could not stop with a savepoint job "f23290bf5fb0ecd49a4455e4a65f2eb6".
       at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
       at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
       at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
       at java.security.AccessController.doPrivileged(Native Method)
       at javax.security.auth.Subject.doAs(Subject.java:422)
       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
       at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
      Caused by: java.util.concurrent.TimeoutException
       at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
       at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
       at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
       ... 9 more

       

      The root of the problem however is that on a taskmanager, I received an exception in shutdown, which lead to restarting (a part) of the pipeline and put it back to running state, thus the console command for stopping timed out (as the job was (partially) back in running state). the exception which looks like a race condition for me in the logs is:

      2021-01-12T06:15:15.827877+01:00 WARN org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 -> validation_source1 -> enrich_source1 -> map_json_source1 -> Sink: write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched from RUNNING to FAILED.
      java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
       at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
       at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
       at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
       at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
       at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
       at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
       at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
       at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
       at org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54)
       at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
       at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
       at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
       at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
       at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
       at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
       ... 13 more
      Caused by: java.lang.RuntimeException
       at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
       at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
       at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
       at org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
       at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
       ... 25 more
      Caused by: java.lang.IllegalStateException
       at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
       at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
       at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
       at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
       at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
       at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
       ... 29 more

      I already raised a question regarding this bug on the user mailing list with the conclusion to just open a ticket here. Original on user mailing list: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bugs-in-Streaming-job-stopping-Weird-graceful-stop-restart-for-disjoint-job-graph-td40610.html

      edit:
      In Flink 1.12.x this bug can probably lead to corrupted data stream and all kinds of deserialisation errors on the downstream task.
      Also the same bug can leave any code (regardless if it's Flink's network stack, user code, or some 3rd party library) that sleeps interruptible in an invalid state.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            pnowojski Piotr Nowojski
            TheoD Theo Diefenthal
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment