Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11250

fix thread leaked when StreamTask switched from DEPLOYING to CANCELING

    XMLWordPrintableJSON

Details

    Description

      begin flink-1.5.x version, streamRecordWriters was created in StreamTask's constructor, which start OutputFlusher daemon thread. so when task switched from DEPLOYING to CANCELING state, the daemon thread will be leaked.

       

      reproducible example

      public static void main(String[] args) throws Exception {
      
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.enableCheckpointing(5000);
          env
                  .addSource(new SourceFunction<String>() {
                      @Override
                      public void run(SourceContext<String> ctx) throws Exception {
      
                          for (int i = 0; i < 100000000; i++) {
                              Thread.sleep(100);
                              ctx.collect("data " + i);
                          }
                      }
      
                      @Override
                      public void cancel() {
      
                      }
                  })
                  .addSink(new RichSinkFunction<String>() {
      
                      @Override
                      public void open(Configuration parameters) throws Exception {
                          System.out.println(1 / 0);
                      }
      
                      @Override
                      public void invoke(String value, Context context) throws Exception {
      
                      }
      
                  }).setParallelism(2);
      
      
          env.execute();
      
      }

      some useful log

      2019-01-02 03:03:47.525 [thread==> jobmanager-future-thread-2] executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CREATED to SCHEDULED.
      2019-01-02 03:03:47.526 [thread==> flink-akka.actor.default-dispatcher-5] slotpool.SlotPool#allocateSlot:326 Received slot request [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] for task: Attempt #1 (Source: Custom Source (1/1)) @ (unassigned) - [SCHEDULED]
      2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot [SlotRequestId{494e47eb8318e2c09999a1db91dda6b8}] in slot [SlotRequestId{6d7f0173c1d48e5559f6a14b080ee817}].
      2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create single task slot [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] in multi task slot [SlotRequestId{494e47eb8318e2c09999a1db91dda6b8}] for group bc764cd8ddf7a0cff126f51c16239658.
      2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create single task slot [SlotRequestId{8a877431375df8aeadb2fd845cae15fc}] in multi task slot [SlotRequestId{494e47eb8318e2c09999a1db91dda6b8}] for group 0a448493b4782967b150582570326227.
      2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] in slot [SlotRequestId{dbf5c9fa39f1e5a0b34a4a8c10699ee5}].
      2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create single task slot [SlotRequestId{5929c12b52dccee682f86afbe1cff5cf}] in multi task slot [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] for group 0a448493b4782967b150582570326227.
      2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from SCHEDULED to DEPLOYING.
      2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] executiongraph.Execution#deploy:576 Deploying Source: Custom Source (1/1) (attempt #1) to localhost
      2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:162 Registered new local state store with configuration LocalRecoveryConfig{localRecoveryMode=false, localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/tmp/localState/aid_AllocationID{7b5faad9073d7fac6759e40981197b8d}], jobID=06e76f6e31728025b22fdda9fadd6f01, jobVertexID=bc764cd8ddf7a0cff126f51c16239658, subtaskIndex=0}} for 06e76f6e31728025b22fdda9fadd6f01 - bc764cd8ddf7a0cff126f51c16239658 - 0 under allocation id AllocationID{7b5faad9073d7fac6759e40981197b8d}.
      2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] partition.ResultPartition#<init>:172 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef): Initialized ResultPartition 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef [PIPELINED_BOUNDED, 2 subpartitions, 2 pending references]
      2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] taskexecutor.TaskExecutor#submitTask:541 Received task Source: Custom Source (1/1).
      2019-01-02 03:03:47.532 [thread==> Source: Custom Source (1/1)] taskmanager.Task#transitionState:992 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CREATED to DEPLOYING.
      2019-01-02 03:03:47.532 [thread==> Source: Custom Source (1/1)] taskmanager.Task#run:655 Creating FileSystem stream leak safety net for task Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING]
      2019-01-02 03:03:47.532 [thread==> flink-akka.actor.default-dispatcher-2] state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:166 Found existing local state store for 06e76f6e31728025b22fdda9fadd6f01 - 0a448493b4782967b150582570326227 - 0 under allocation id AllocationID{7b5faad9073d7fac6759e40981197b8d}: org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@176e8a65
      2019-01-02 03:03:47.535 [thread==> Source: Custom Source (1/1)] taskmanager.Task#run:662 Loading JAR files for task Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
      2019-01-02 03:03:47.536 [thread==> Source: Custom Source (1/1)] taskmanager.Task#createUserCodeClassloader:947 Getting user code class loader for task 74a4ed4bb2f80aa2b98e11bd09ea64ef at library cache manager took 0 milliseconds
      2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] taskmanager.Task#run:688 Registering task at network: Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
      2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] buffer.LocalBufferPool#<init>:125 Using a local buffer pool with 2-12 buffers
      2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] partition.ResultPartitionManager#registerResultPartition:62 Registered ResultPartition 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef [PIPELINED_BOUNDED, 2 subpartitions, 2 pending references].
      2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] network.TaskEventDispatcher#registerPartition:59 registering 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef
      2019-01-02 03:03:47.537 [thread==> flink-akka.actor.default-dispatcher-2] state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:166 Found existing local state store for 06e76f6e31728025b22fdda9fadd6f01 - 0a448493b4782967b150582570326227 - 1 under allocation id AllocationID{d8a1fd06e3e52101b003a04355f64be1}: org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@57bf2d8e
      2019-01-02 03:03:47.537 [thread==> Source: Custom Source (1/1)] taskmanager.Task#run:714 =======> next, kick off the background copying of files for the distributed cache: Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
      2019-01-02 03:03:47.538 [thread==> Source: Custom Source (1/1)] taskmanager.Task#run:729 =======> isCanceledOrFailed: Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
      2019-01-02 03:03:47.540 [thread==> Source: Custom Source (1/1)] taskmanager.Task#run:737 =======> call the user code initialization methods: Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
      2019-01-02 03:03:47.543 [thread==> Source: Custom Source (1/1)] taskmanager.Task#run:765 =======> now load and instantiate the task's invokable code: Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [DEPLOYING].
      2019-01-02 03:03:47.543 [thread==> flink-akka.actor.default-dispatcher-5] executiongraph.ExecutionGraph#transitionState:1356 Job Flink Streaming Job (06e76f6e31728025b22fdda9fadd6f01) switched from state RUNNING to FAILING.
      2019-01-02 03:03:47.544 [thread==> Source: Custom Source (1/1)] tasks.StreamTask#createStreamRecordWriter:1214 Using partitioner REBALANCE for output 0 of task Source: Custom Source
      2019-01-02 03:03:47.544 [thread==> Source: Custom Source (1/1)] io.StreamRecordWriter$OutputFlusher#<init>:174 StreamRecordWriter start : outputflusher1546369427544, timeout: 100
      2019-01-02 03:03:47.544 [thread==> flink-akka.actor.default-dispatcher-5] executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from DEPLOYING to CANCELING.
      2019-01-02 03:03:47.544 [thread==> flink-akka.actor.default-dispatcher-2] taskmanager.Task#cancelExecution:1055 Attempting to cancel task Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef).
      2019-01-02 03:03:47.544 [thread==> flink-akka.actor.default-dispatcher-2] taskmanager.Task#cancelOrFailAndCancelInvokable:1078 cancelOrFailAndCancelInvokable_current_execution_state==> task: Source: Custom Source (1/1), state: DEPLOYING, targetState CANCELING
      2019-01-02 03:03:47.545 [thread==> flink-akka.actor.default-dispatcher-2] taskmanager.Task#transitionState:992 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from DEPLOYING to CANCELING.
      2019-01-02 03:03:47.545 [thread==> flink-akka.actor.default-dispatcher-3] slotpool.SlotPool#releaseSlot:745 Releasing slot [SlotRequestId{5929c12b52dccee682f86afbe1cff5cf}] because: Slot is being returned to the SlotPool.
      2019-01-02 03:03:47.545 [thread==> flink-akka.actor.default-dispatcher-3] slotpool.SlotPool#releaseSlot:745 Releasing slot [SlotRequestId{dbf5c9fa39f1e5a0b34a4a8c10699ee5}] because: Release multi task slot because all children have been released.
      2019-01-02 03:03:47.545 [thread==> flink-akka.actor.default-dispatcher-3] slotpool.SlotPool#tryFulfillSlotRequestOrMakeAvailable:842 Adding returned slot [AllocationID{d8a1fd06e3e52101b003a04355f64be1}] to available slots
      2019-01-02 03:03:47.546 [thread==> flink-akka.actor.default-dispatcher-2] taskexecutor.TaskExecutor#failPartition:656 Discarding the results produced by task execution 8aeff98e78ed3d348af9d4d5679f2b26.
      2019-01-02 03:03:47.546 [thread==> flink-akka.actor.default-dispatcher-2] partition.ResultPartitionManager#releasePartitionsProducedBy:105 Released all partitions produced by 8aeff98e78ed3d348af9d4d5679f2b26.
      2019-01-02 03:03:47.547 [thread==> Source: Custom Source (1/1)] taskmanager.Task#transitionState:992 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CANCELING to CANCELED.
      2019-01-02 03:03:47.547 [thread==> Source: Custom Source (1/1)] taskmanager.Task#run:892 Freeing task resources for Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef).
      2019-01-02 03:03:47.547 [thread==> Source: Custom Source (1/1)] network.NetworkEnvironment#unregisterTask:263 Unregister task Source: Custom Source (1/1) from network environment (state: CANCELED).
      2019-01-02 03:03:47.547 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:47.547 [thread==> Source: Custom Source (1/1)] partition.ResultPartition#release:313 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef): Releasing ResultPartition 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef [PIPELINED_BOUNDED, 2 subpartitions, 2 pending references].
      2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] partition.PipelinedSubpartition#release:137 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef): Released PipelinedSubpartition#0 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false].
      2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] partition.PipelinedSubpartition#release:137 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef): Released PipelinedSubpartition#1 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false].
      2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] partition.ResultPartitionManager#releasePartitionsProducedBy:105 Released all partitions produced by 74a4ed4bb2f80aa2b98e11bd09ea64ef.
      2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] network.TaskEventDispatcher#unregisterPartition:78 unregistering 85c7415bae559d6198b8bb69d4c6e49f@74a4ed4bb2f80aa2b98e11bd09ea64ef
      2019-01-02 03:03:47.548 [thread==> Source: Custom Source (1/1)] taskmanager.Task#run:919 Ensuring all FileSystem streams are closed for task Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) [CANCELED]
      2019-01-02 03:03:47.549 [thread==> flink-akka.actor.default-dispatcher-2] taskexecutor.TaskExecutor#unregisterTaskAndNotifyFinalState:1337 Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source 74a4ed4bb2f80aa2b98e11bd09ea64ef.
      2019-01-02 03:03:47.562 [thread==> flink-akka.actor.default-dispatcher-3] slotpool.SlotPool#releaseSlot:745 Releasing slot [SlotRequestId{8a877431375df8aeadb2fd845cae15fc}] because: Slot is being returned to the SlotPool.
      2019-01-02 03:03:47.566 [thread==> flink-akka.actor.default-dispatcher-3] executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CANCELING to CANCELED.
      2019-01-02 03:03:47.566 [thread==> flink-akka.actor.default-dispatcher-3] executiongraph.Execution#processFail:1079 Ignoring transition of vertex Source: Custom Source (1/1) - execution #1 to FAILED while being CANCELED.
      2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] slotpool.SlotPool#releaseSlot:745 Releasing slot [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] because: Slot is being returned to the SlotPool.
      2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] executiongraph.ExecutionGraph#tryRestartOrFail:1477 Try to restart or fail the job Flink Streaming Job (06e76f6e31728025b22fdda9fadd6f01) if no longer possible.
      2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] executiongraph.ExecutionGraph#transitionState:1356 Job Flink Streaming Job (06e76f6e31728025b22fdda9fadd6f01) switched from state FAILING to RESTARTING.
      2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] executiongraph.ExecutionGraph#tryRestartOrFail:1487 Restarting the job Flink Streaming Job (06e76f6e31728025b22fdda9fadd6f01).
      2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] slotpool.SlotPool#releaseSlot:745 Releasing slot [SlotRequestId{6d7f0173c1d48e5559f6a14b080ee817}] because: Release multi task slot because all children have been released.
      2019-01-02 03:03:47.567 [thread==> flink-akka.actor.default-dispatcher-3] slotpool.SlotPool#tryFulfillSlotRequestOrMakeAvailable:842 Adding returned slot [AllocationID{7b5faad9073d7fac6759e40981197b8d}] to available slots
      2019-01-02 03:03:47.647 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:47.748 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:47.848 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:47.948 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:48.049 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:48.149 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:48.249 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:48.349 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:48.450 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      2019-01-02 03:03:48.550 [thread==> OutputFlusher for Source: Custom Source] io.StreamRecordWriter$OutputFlusher#run:192 Running OutputFlusher outputflusher1546369427544
      

       

       

      Attachments

        Activity

          People

            akalashnikov Anton Kalashnikov
            lamber-ken lamber-ken
            Votes:
            1 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 40m
                40m