Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5937

Add documentation about the task lifecycle.

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: Documentation
    • Labels:
      None

      Issue Links

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user kl0u opened a pull request:

        https://github.com/apache/flink/pull/3429

        FLINK-5937 [doc] Add documentation about the stream task lifecycle

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/kl0u/flink task-doc

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3429.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3429


        commit 34673d63af17a414d47a842720fbab3ff4b753d0
        Author: kl0u <kkloudas@gmail.com>
        Date: 2017-02-28T14:41:37Z

        FLINK-5937 [doc] Add documentation about the stream task lifecycle


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3429 FLINK-5937 [doc] Add documentation about the stream task lifecycle You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink task-doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3429.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3429 commit 34673d63af17a414d47a842720fbab3ff4b753d0 Author: kl0u <kkloudas@gmail.com> Date: 2017-02-28T14:41:37Z FLINK-5937 [doc] Add documentation about the stream task lifecycle
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103482237

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        — End diff –

        "in case of a normal, ..."

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103482237 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to — End diff – "in case of a normal, ..."
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103482158

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        — End diff –

        insert comma before "respectively"

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103482158 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. — End diff – insert comma before "respectively"
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103483914

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between
        +the phase the operator was in when the failure happened and the `dispose` one.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received.
        +Its responsibility is to store the current state of the operator to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it is going to be
        +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion
        +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
        +
        +## Task Lifecycle
        +
        +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence
        +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the
        +phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)), and (a shorter) one describing the different sequence followed in case
        +the task is cancelled (see [Interrupted Execution](#interrupted-execution)),
        +either manually, or due some other reason, e.g. an exception thrown during execution.
        +
        +### Normal Execution
        +
        +The steps a task goes through when executed until completion without being interrupted are illustrated below:
        +
        + setInitialState()
        + invoke()
        + Create basic utils (config, etc) and load the chain of operators
        + setup-operators()
        + task specific init()
        + initialize-operator-states()
        + open-operators()
        + run()
        + close-operators()
        + dispose-operators()
        + task specific cleanup()
        + common cleanup
        +
        +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial,
        +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases:
        — End diff –

        IMO, this should be briefly mentioned before

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103483914 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator. + +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between +the phase the operator was in when the failure happened and the `dispose` one. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received. +Its responsibility is to store the current state of the operator to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it is going to be +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance] ({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the +phases during a regular, fault-free execution of a task (see [Normal Execution] (#normal-execution)), and (a shorter) one describing the different sequence followed in case +the task is cancelled (see [Interrupted Execution] (#interrupted-execution)), +either manually, or due some other reason, e.g. an exception thrown during execution. + +### Normal Execution + +The steps a task goes through when executed until completion without being interrupted are illustrated below: + + setInitialState() + invoke() + Create basic utils (config, etc) and load the chain of operators + setup-operators() + task specific init() + initialize-operator-states() + open-operators() + run() + close-operators() + dispose-operators() + task specific cleanup() + common cleanup + +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial, +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases: — End diff – IMO, this should be briefly mentioned before
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103481746

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        — End diff –

        Runtim*E*Context

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103481746 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as — End diff – Runtim* E *Context
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103483353

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between
        +the phase the operator was in when the failure happened and the `dispose` one.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received.
        +Its responsibility is to store the current state of the operator to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it is going to be
        +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion
        +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
        +
        +## Task Lifecycle
        +
        +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence
        — End diff –

        "calls them" -> "calls the respective methods"

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103483353 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator. + +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between +the phase the operator was in when the failure happened and the `dispose` one. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received. +Its responsibility is to store the current state of the operator to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it is going to be +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance] ({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence — End diff – "calls them" -> "calls the respective methods"
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103481621

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        — End diff –

        Does initialization also include restoring previous state?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103481621 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific — End diff – Does initialization also include restoring previous state?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103482954

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between
        +the phase the operator was in when the failure happened and the `dispose` one.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received.
        +Its responsibility is to store the current state of the operator to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it is going to be
        +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion
        — End diff –

        "bried" -> "brief"

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103482954 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator. + +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between +the phase the operator was in when the failure happened and the `dispose` one. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received. +Its responsibility is to store the current state of the operator to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it is going to be +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion — End diff – "bried" -> "brief"
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103486170

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between
        +the phase the operator was in when the failure happened and the `dispose` one.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received.
        +Its responsibility is to store the current state of the operator to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it is going to be
        +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion
        +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
        +
        +## Task Lifecycle
        +
        +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence
        +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the
        +phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)), and (a shorter) one describing the different sequence followed in case
        +the task is cancelled (see [Interrupted Execution](#interrupted-execution)),
        +either manually, or due some other reason, e.g. an exception thrown during execution.
        +
        +### Normal Execution
        +
        +The steps a task goes through when executed until completion without being interrupted are illustrated below:
        +
        + setInitialState()
        + invoke()
        + Create basic utils (config, etc) and load the chain of operators
        + setup-operators()
        + task specific init()
        + initialize-operator-states()
        + open-operators()
        + run()
        + close-operators()
        + dispose-operators()
        + task specific cleanup()
        + common cleanup
        +
        +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial,
        +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases:
        +
        +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and
        +2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html).
        +
        +If it is the first time the task is executed, the initial task state is empty.
        +
        +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling
        +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that
        +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary
        +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s)
        +to the location(s) of the different partitions of the input stream that are relevant to the local task.
        +
        +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state
        +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by
        +every stateful operator and should contain the state initialization logic, both for the first time a job is executed, but also for the case when the task recovers from
        +a failure or using a savepoint.
        +
        +Now that all operators in the task have been initialized, the `open()` method of each individual operator is called by the `openAllOperators()` method of the `StreamTask`.
        +This is the place that implements all the operational initialization, such as register any retrieved timers with the timer service. A single task may be executing multiple
        +operators with one consuming the output of its predecessor. In this case, the `open()` method is called from the last operator, i.e. the one whose output is also the output
        +of the task itself, to the first. This is done so that when the first operator starts processing the task's input, all downstream operators are ready to receive its output.
        +
        +Now the task can resume execution, and operators can start processing fresh input data. This is the place where the task-specific `run()` mehtod is called. This method will
        +run until either there is no more input data (finite stream), or the task is cancelled (manually or not). Here is where the operator specific `processElement()` and
        +`processWatermark()` methods are called.
        +
        +In the case of running till completion, i.e. there is no more input data to process, after exiting from the `run()` method, the task enters its shutdown process.
        +Initially, the timer service stops registering any new timers (e.g. from fired timers that are being executed), clears all the not yet started timers, and awaits the
        +completion of currently executing ones. Then the `closeAllOperators()` tries to gracefully close the operators involved in the computation by calling the `close()` method
        +of each one of them. Then any buffered output data is flushed so that they can be processed by the downstream tasks, and finally the task tries to clear all the resources
        +held by the operators by calling the `dispose()` method of each one of them. When opening the different operators, we mentioned that the order is from the last to the first.
        +Closing happens the other way round, from first to last.
        +
        +Finally, when all operators have been closed and all their resources freed, the task shuts down its timer service, performs its task-specific cleanup, e.g. clean all its
        +internal buffers, and then performs its generic task clean up which consists of closing all its output channels and cleaning any output buffers.
        +
        +*Checkpoints:* Previously we saw that during `initializeState()`, and in case of recovering from a failure, the task and all its operators and functions retrieve the
        +state that was persisted to stable storage during the last successful checkpoint before the failure. Checkpoints in Flink are performed periodically based on a user-specified
        +interval, and are performed by a different thread than that of the main task thread. This is the reason the are not included in the main phases of the task lifecycle. In a
        — End diff –

        "This is the reason the are not" -> "That's why they are not"

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103486170 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator. + +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between +the phase the operator was in when the failure happened and the `dispose` one. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received. +Its responsibility is to store the current state of the operator to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it is going to be +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance] ({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the +phases during a regular, fault-free execution of a task (see [Normal Execution] (#normal-execution)), and (a shorter) one describing the different sequence followed in case +the task is cancelled (see [Interrupted Execution] (#interrupted-execution)), +either manually, or due some other reason, e.g. an exception thrown during execution. + +### Normal Execution + +The steps a task goes through when executed until completion without being interrupted are illustrated below: + + setInitialState() + invoke() + Create basic utils (config, etc) and load the chain of operators + setup-operators() + task specific init() + initialize-operator-states() + open-operators() + run() + close-operators() + dispose-operators() + task specific cleanup() + common cleanup + +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial, +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases: + +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and +2. when resuming from a [savepoint] ({{ site.baseurl }}/setup/savepoints.html). + +If it is the first time the task is executed, the initial task state is empty. + +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s) +to the location(s) of the different partitions of the input stream that are relevant to the local task. + +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by +every stateful operator and should contain the state initialization logic, both for the first time a job is executed, but also for the case when the task recovers from +a failure or using a savepoint. + +Now that all operators in the task have been initialized, the `open()` method of each individual operator is called by the `openAllOperators()` method of the `StreamTask`. +This is the place that implements all the operational initialization, such as register any retrieved timers with the timer service. A single task may be executing multiple +operators with one consuming the output of its predecessor. In this case, the `open()` method is called from the last operator, i.e. the one whose output is also the output +of the task itself, to the first. This is done so that when the first operator starts processing the task's input, all downstream operators are ready to receive its output. + +Now the task can resume execution, and operators can start processing fresh input data. This is the place where the task-specific `run()` mehtod is called. This method will +run until either there is no more input data (finite stream), or the task is cancelled (manually or not). Here is where the operator specific `processElement()` and +`processWatermark()` methods are called. + +In the case of running till completion, i.e. there is no more input data to process, after exiting from the `run()` method, the task enters its shutdown process. +Initially, the timer service stops registering any new timers (e.g. from fired timers that are being executed), clears all the not yet started timers, and awaits the +completion of currently executing ones. Then the `closeAllOperators()` tries to gracefully close the operators involved in the computation by calling the `close()` method +of each one of them. Then any buffered output data is flushed so that they can be processed by the downstream tasks, and finally the task tries to clear all the resources +held by the operators by calling the `dispose()` method of each one of them. When opening the different operators, we mentioned that the order is from the last to the first. +Closing happens the other way round, from first to last. + +Finally, when all operators have been closed and all their resources freed, the task shuts down its timer service, performs its task-specific cleanup, e.g. clean all its +internal buffers, and then performs its generic task clean up which consists of closing all its output channels and cleaning any output buffers. + +* Checkpoints: * Previously we saw that during `initializeState()`, and in case of recovering from a failure, the task and all its operators and functions retrieve the +state that was persisted to stable storage during the last successful checkpoint before the failure. Checkpoints in Flink are performed periodically based on a user-specified +interval, and are performed by a different thread than that of the main task thread. This is the reason the are not included in the main phases of the task lifecycle. In a — End diff – "This is the reason the are not" -> "That's why they are not"
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103485368

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between
        +the phase the operator was in when the failure happened and the `dispose` one.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received.
        +Its responsibility is to store the current state of the operator to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it is going to be
        +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion
        +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
        +
        +## Task Lifecycle
        +
        +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence
        +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the
        +phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)), and (a shorter) one describing the different sequence followed in case
        +the task is cancelled (see [Interrupted Execution](#interrupted-execution)),
        +either manually, or due some other reason, e.g. an exception thrown during execution.
        +
        +### Normal Execution
        +
        +The steps a task goes through when executed until completion without being interrupted are illustrated below:
        +
        + setInitialState()
        + invoke()
        + Create basic utils (config, etc) and load the chain of operators
        + setup-operators()
        + task specific init()
        + initialize-operator-states()
        + open-operators()
        + run()
        + close-operators()
        + dispose-operators()
        + task specific cleanup()
        + common cleanup
        +
        +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial,
        +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases:
        +
        +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and
        +2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html).
        +
        +If it is the first time the task is executed, the initial task state is empty.
        +
        +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling
        +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that
        +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary
        +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s)
        +to the location(s) of the different partitions of the input stream that are relevant to the local task.
        +
        +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state
        +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by
        +every stateful operator and should contain the state initialization logic, both for the first time a job is executed, but also for the case when the task recovers from
        +a failure or using a savepoint.
        +
        +Now that all operators in the task have been initialized, the `open()` method of each individual operator is called by the `openAllOperators()` method of the `StreamTask`.
        +This is the place that implements all the operational initialization, such as register any retrieved timers with the timer service. A single task may be executing multiple
        +operators with one consuming the output of its predecessor. In this case, the `open()` method is called from the last operator, i.e. the one whose output is also the output
        +of the task itself, to the first. This is done so that when the first operator starts processing the task's input, all downstream operators are ready to receive its output.
        +
        +Now the task can resume execution, and operators can start processing fresh input data. This is the place where the task-specific `run()` mehtod is called. This method will
        — End diff –

        "mehtod" -> "method"

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103485368 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator. + +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between +the phase the operator was in when the failure happened and the `dispose` one. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received. +Its responsibility is to store the current state of the operator to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it is going to be +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance] ({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the +phases during a regular, fault-free execution of a task (see [Normal Execution] (#normal-execution)), and (a shorter) one describing the different sequence followed in case +the task is cancelled (see [Interrupted Execution] (#interrupted-execution)), +either manually, or due some other reason, e.g. an exception thrown during execution. + +### Normal Execution + +The steps a task goes through when executed until completion without being interrupted are illustrated below: + + setInitialState() + invoke() + Create basic utils (config, etc) and load the chain of operators + setup-operators() + task specific init() + initialize-operator-states() + open-operators() + run() + close-operators() + dispose-operators() + task specific cleanup() + common cleanup + +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial, +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases: + +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and +2. when resuming from a [savepoint] ({{ site.baseurl }}/setup/savepoints.html). + +If it is the first time the task is executed, the initial task state is empty. + +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s) +to the location(s) of the different partitions of the input stream that are relevant to the local task. + +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by +every stateful operator and should contain the state initialization logic, both for the first time a job is executed, but also for the case when the task recovers from +a failure or using a savepoint. + +Now that all operators in the task have been initialized, the `open()` method of each individual operator is called by the `openAllOperators()` method of the `StreamTask`. +This is the place that implements all the operational initialization, such as register any retrieved timers with the timer service. A single task may be executing multiple +operators with one consuming the output of its predecessor. In this case, the `open()` method is called from the last operator, i.e. the one whose output is also the output +of the task itself, to the first. This is done so that when the first operator starts processing the task's input, all downstream operators are ready to receive its output. + +Now the task can resume execution, and operators can start processing fresh input data. This is the place where the task-specific `run()` mehtod is called. This method will — End diff – "mehtod" -> "method"
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103481296

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        — End diff –

        What about `snapshotState()`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103481296 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: — End diff – What about `snapshotState()`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103482579

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between
        — End diff –

        "In case of a termination..."?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103482579 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator. + +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between — End diff – "In case of a termination..."?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103484503

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between
        +the phase the operator was in when the failure happened and the `dispose` one.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received.
        +Its responsibility is to store the current state of the operator to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it is going to be
        +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion
        +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
        +
        +## Task Lifecycle
        +
        +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence
        +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the
        +phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)), and (a shorter) one describing the different sequence followed in case
        +the task is cancelled (see [Interrupted Execution](#interrupted-execution)),
        +either manually, or due some other reason, e.g. an exception thrown during execution.
        +
        +### Normal Execution
        +
        +The steps a task goes through when executed until completion without being interrupted are illustrated below:
        +
        + setInitialState()
        + invoke()
        + Create basic utils (config, etc) and load the chain of operators
        + setup-operators()
        + task specific init()
        + initialize-operator-states()
        + open-operators()
        + run()
        + close-operators()
        + dispose-operators()
        + task specific cleanup()
        + common cleanup
        +
        +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial,
        +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases:
        +
        +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and
        +2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html).
        +
        +If it is the first time the task is executed, the initial task state is empty.
        +
        +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling
        +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that
        +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary
        +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s)
        +to the location(s) of the different partitions of the input stream that are relevant to the local task.
        +
        +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state
        +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by
        — End diff –

        "overriden" -> "overridden"

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103484503 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator. + +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between +the phase the operator was in when the failure happened and the `dispose` one. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received. +Its responsibility is to store the current state of the operator to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it is going to be +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance] ({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the +phases during a regular, fault-free execution of a task (see [Normal Execution] (#normal-execution)), and (a shorter) one describing the different sequence followed in case +the task is cancelled (see [Interrupted Execution] (#interrupted-execution)), +either manually, or due some other reason, e.g. an exception thrown during execution. + +### Normal Execution + +The steps a task goes through when executed until completion without being interrupted are illustrated below: + + setInitialState() + invoke() + Create basic utils (config, etc) and load the chain of operators + setup-operators() + task specific init() + initialize-operator-states() + open-operators() + run() + close-operators() + dispose-operators() + task specific cleanup() + common cleanup + +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial, +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases: + +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and +2. when resuming from a [savepoint] ({{ site.baseurl }}/setup/savepoints.html). + +If it is the first time the task is executed, the initial task state is empty. + +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s) +to the location(s) of the different partitions of the input stream that are relevant to the local task. + +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by — End diff – "overriden" -> "overridden"
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103486754

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between
        +the phase the operator was in when the failure happened and the `dispose` one.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received.
        +Its responsibility is to store the current state of the operator to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it is going to be
        +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion
        +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
        +
        +## Task Lifecycle
        +
        +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence
        +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the
        +phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)), and (a shorter) one describing the different sequence followed in case
        +the task is cancelled (see [Interrupted Execution](#interrupted-execution)),
        +either manually, or due some other reason, e.g. an exception thrown during execution.
        +
        +### Normal Execution
        +
        +The steps a task goes through when executed until completion without being interrupted are illustrated below:
        +
        + setInitialState()
        + invoke()
        + Create basic utils (config, etc) and load the chain of operators
        + setup-operators()
        + task specific init()
        + initialize-operator-states()
        + open-operators()
        + run()
        + close-operators()
        + dispose-operators()
        + task specific cleanup()
        + common cleanup
        +
        +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial,
        +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases:
        +
        +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and
        +2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html).
        +
        +If it is the first time the task is executed, the initial task state is empty.
        +
        +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling
        +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that
        +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary
        +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s)
        +to the location(s) of the different partitions of the input stream that are relevant to the local task.
        +
        +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state
        +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by
        +every stateful operator and should contain the state initialization logic, both for the first time a job is executed, but also for the case when the task recovers from
        +a failure or using a savepoint.
        +
        +Now that all operators in the task have been initialized, the `open()` method of each individual operator is called by the `openAllOperators()` method of the `StreamTask`.
        +This is the place that implements all the operational initialization, such as register any retrieved timers with the timer service. A single task may be executing multiple
        +operators with one consuming the output of its predecessor. In this case, the `open()` method is called from the last operator, i.e. the one whose output is also the output
        +of the task itself, to the first. This is done so that when the first operator starts processing the task's input, all downstream operators are ready to receive its output.
        +
        +Now the task can resume execution, and operators can start processing fresh input data. This is the place where the task-specific `run()` mehtod is called. This method will
        +run until either there is no more input data (finite stream), or the task is cancelled (manually or not). Here is where the operator specific `processElement()` and
        +`processWatermark()` methods are called.
        +
        +In the case of running till completion, i.e. there is no more input data to process, after exiting from the `run()` method, the task enters its shutdown process.
        +Initially, the timer service stops registering any new timers (e.g. from fired timers that are being executed), clears all the not yet started timers, and awaits the
        +completion of currently executing ones. Then the `closeAllOperators()` tries to gracefully close the operators involved in the computation by calling the `close()` method
        +of each one of them. Then any buffered output data is flushed so that they can be processed by the downstream tasks, and finally the task tries to clear all the resources
        +held by the operators by calling the `dispose()` method of each one of them. When opening the different operators, we mentioned that the order is from the last to the first.
        +Closing happens the other way round, from first to last.
        +
        +Finally, when all operators have been closed and all their resources freed, the task shuts down its timer service, performs its task-specific cleanup, e.g. clean all its
        +internal buffers, and then performs its generic task clean up which consists of closing all its output channels and cleaning any output buffers.
        +
        +*Checkpoints:* Previously we saw that during `initializeState()`, and in case of recovering from a failure, the task and all its operators and functions retrieve the
        +state that was persisted to stable storage during the last successful checkpoint before the failure. Checkpoints in Flink are performed periodically based on a user-specified
        +interval, and are performed by a different thread than that of the main task thread. This is the reason the are not included in the main phases of the task lifecycle. In a
        +nutshell, special elements called `CheckpointBarriers` are injected periodically by the source tasks of a job in the stream of input data, and travel with the actual data from
        +source to sink. A source task injects these barriers after it is in running mode, and assuming that the CheckpointCoordinator is also running. Whenever a task receives such a
        +barrier, it schedules a task to be performed by the checkpoint thread which calls the `snapshotState()` of the operators in the task. Input data can still be received by the
        +task while the checkpoint is being performed, but they are buffered and only emitted downstream after the checkpoint is successfully completed. For details on the principles
        — End diff –

        "but they are buffered ..." -> "but they are buffered and only processed and emitted downstream..."

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103486754 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator. + +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between +the phase the operator was in when the failure happened and the `dispose` one. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received. +Its responsibility is to store the current state of the operator to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it is going to be +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance] ({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the +phases during a regular, fault-free execution of a task (see [Normal Execution] (#normal-execution)), and (a shorter) one describing the different sequence followed in case +the task is cancelled (see [Interrupted Execution] (#interrupted-execution)), +either manually, or due some other reason, e.g. an exception thrown during execution. + +### Normal Execution + +The steps a task goes through when executed until completion without being interrupted are illustrated below: + + setInitialState() + invoke() + Create basic utils (config, etc) and load the chain of operators + setup-operators() + task specific init() + initialize-operator-states() + open-operators() + run() + close-operators() + dispose-operators() + task specific cleanup() + common cleanup + +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial, +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases: + +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and +2. when resuming from a [savepoint] ({{ site.baseurl }}/setup/savepoints.html). + +If it is the first time the task is executed, the initial task state is empty. + +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s) +to the location(s) of the different partitions of the input stream that are relevant to the local task. + +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by +every stateful operator and should contain the state initialization logic, both for the first time a job is executed, but also for the case when the task recovers from +a failure or using a savepoint. + +Now that all operators in the task have been initialized, the `open()` method of each individual operator is called by the `openAllOperators()` method of the `StreamTask`. +This is the place that implements all the operational initialization, such as register any retrieved timers with the timer service. A single task may be executing multiple +operators with one consuming the output of its predecessor. In this case, the `open()` method is called from the last operator, i.e. the one whose output is also the output +of the task itself, to the first. This is done so that when the first operator starts processing the task's input, all downstream operators are ready to receive its output. + +Now the task can resume execution, and operators can start processing fresh input data. This is the place where the task-specific `run()` mehtod is called. This method will +run until either there is no more input data (finite stream), or the task is cancelled (manually or not). Here is where the operator specific `processElement()` and +`processWatermark()` methods are called. + +In the case of running till completion, i.e. there is no more input data to process, after exiting from the `run()` method, the task enters its shutdown process. +Initially, the timer service stops registering any new timers (e.g. from fired timers that are being executed), clears all the not yet started timers, and awaits the +completion of currently executing ones. Then the `closeAllOperators()` tries to gracefully close the operators involved in the computation by calling the `close()` method +of each one of them. Then any buffered output data is flushed so that they can be processed by the downstream tasks, and finally the task tries to clear all the resources +held by the operators by calling the `dispose()` method of each one of them. When opening the different operators, we mentioned that the order is from the last to the first. +Closing happens the other way round, from first to last. + +Finally, when all operators have been closed and all their resources freed, the task shuts down its timer service, performs its task-specific cleanup, e.g. clean all its +internal buffers, and then performs its generic task clean up which consists of closing all its output channels and cleaning any output buffers. + +* Checkpoints: * Previously we saw that during `initializeState()`, and in case of recovering from a failure, the task and all its operators and functions retrieve the +state that was persisted to stable storage during the last successful checkpoint before the failure. Checkpoints in Flink are performed periodically based on a user-specified +interval, and are performed by a different thread than that of the main task thread. This is the reason the are not included in the main phases of the task lifecycle. In a +nutshell, special elements called `CheckpointBarriers` are injected periodically by the source tasks of a job in the stream of input data, and travel with the actual data from +source to sink. A source task injects these barriers after it is in running mode, and assuming that the CheckpointCoordinator is also running. Whenever a task receives such a +barrier, it schedules a task to be performed by the checkpoint thread which calls the `snapshotState()` of the operators in the task. Input data can still be received by the +task while the checkpoint is being performed, but they are buffered and only emitted downstream after the checkpoint is successfully completed. For details on the principles — End diff – "but they are buffered ..." -> "but they are buffered and only processed and emitted downstream..."
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103481871

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        — End diff –

        "the `open()` method?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103481871 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as — End diff – "the `open()` method ?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103481205

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        — End diff –

        Move checkpointing into a separate box to make clear it is not included in the regular thread?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103481205 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing — End diff – Move checkpointing into a separate box to make clear it is not included in the regular thread?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103486405

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        +contain the logic for processing elements and watermark respectively.
        +
        +Finally, in the case of normal, fault-free termination of the operator (e.g. if the stream is finite and its end is reached), the `close()` method is called to
        +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between
        +the phase the operator was in when the failure happened and the `dispose` one.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received.
        +Its responsibility is to store the current state of the operator to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it is going to be
        +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion
        +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
        +
        +## Task Lifecycle
        +
        +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence
        +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the
        +phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)), and (a shorter) one describing the different sequence followed in case
        +the task is cancelled (see [Interrupted Execution](#interrupted-execution)),
        +either manually, or due some other reason, e.g. an exception thrown during execution.
        +
        +### Normal Execution
        +
        +The steps a task goes through when executed until completion without being interrupted are illustrated below:
        +
        + setInitialState()
        + invoke()
        + Create basic utils (config, etc) and load the chain of operators
        + setup-operators()
        + task specific init()
        + initialize-operator-states()
        + open-operators()
        + run()
        + close-operators()
        + dispose-operators()
        + task specific cleanup()
        + common cleanup
        +
        +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial,
        +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases:
        +
        +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and
        +2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html).
        +
        +If it is the first time the task is executed, the initial task state is empty.
        +
        +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling
        +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that
        +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary
        +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s)
        +to the location(s) of the different partitions of the input stream that are relevant to the local task.
        +
        +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state
        +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by
        +every stateful operator and should contain the state initialization logic, both for the first time a job is executed, but also for the case when the task recovers from
        +a failure or using a savepoint.
        +
        +Now that all operators in the task have been initialized, the `open()` method of each individual operator is called by the `openAllOperators()` method of the `StreamTask`.
        +This is the place that implements all the operational initialization, such as register any retrieved timers with the timer service. A single task may be executing multiple
        +operators with one consuming the output of its predecessor. In this case, the `open()` method is called from the last operator, i.e. the one whose output is also the output
        +of the task itself, to the first. This is done so that when the first operator starts processing the task's input, all downstream operators are ready to receive its output.
        +
        +Now the task can resume execution, and operators can start processing fresh input data. This is the place where the task-specific `run()` mehtod is called. This method will
        +run until either there is no more input data (finite stream), or the task is cancelled (manually or not). Here is where the operator specific `processElement()` and
        +`processWatermark()` methods are called.
        +
        +In the case of running till completion, i.e. there is no more input data to process, after exiting from the `run()` method, the task enters its shutdown process.
        +Initially, the timer service stops registering any new timers (e.g. from fired timers that are being executed), clears all the not yet started timers, and awaits the
        +completion of currently executing ones. Then the `closeAllOperators()` tries to gracefully close the operators involved in the computation by calling the `close()` method
        +of each one of them. Then any buffered output data is flushed so that they can be processed by the downstream tasks, and finally the task tries to clear all the resources
        +held by the operators by calling the `dispose()` method of each one of them. When opening the different operators, we mentioned that the order is from the last to the first.
        +Closing happens the other way round, from first to last.
        +
        +Finally, when all operators have been closed and all their resources freed, the task shuts down its timer service, performs its task-specific cleanup, e.g. clean all its
        +internal buffers, and then performs its generic task clean up which consists of closing all its output channels and cleaning any output buffers.
        +
        +*Checkpoints:* Previously we saw that during `initializeState()`, and in case of recovering from a failure, the task and all its operators and functions retrieve the
        +state that was persisted to stable storage during the last successful checkpoint before the failure. Checkpoints in Flink are performed periodically based on a user-specified
        +interval, and are performed by a different thread than that of the main task thread. This is the reason the are not included in the main phases of the task lifecycle. In a
        +nutshell, special elements called `CheckpointBarriers` are injected periodically by the source tasks of a job in the stream of input data, and travel with the actual data from
        +source to sink. A source task injects these barriers after it is in running mode, and assuming that the CheckpointCoordinator is also running. Whenever a task receives such a
        — End diff –

        CheckpointCoordinator -> `CheckpointCoordinator`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103486405 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which +contain the logic for processing elements and watermark respectively. + +Finally, in the case of normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is reached), the `close()` method is called to +perform any final bookkeeping action required by the operator's logic, and the `dispose()` is called after that to free any resources held by the operator. + +In the case of termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`, and skips any intermediate phases between +the phase the operator was in when the failure happened and the `dispose` one. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above, whenever a checkpoint barrier is received. +Its responsibility is to store the current state of the operator to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it is going to be +retrieved when the job resumes execution after a failure. For a bried description of Flink's checkpointing mechanism please keep on reading, and for a more detailed discussion +on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance] ({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Given the above brief introduction on the operator's main phases, this section describes in more detail how a task calls them during its execution on a cluster. The sequence +of the phases described here is mainly included in the `invoke()` method of the `StreamTask`. The remainder of this document is split into two subsections, one describing the +phases during a regular, fault-free execution of a task (see [Normal Execution] (#normal-execution)), and (a shorter) one describing the different sequence followed in case +the task is cancelled (see [Interrupted Execution] (#interrupted-execution)), +either manually, or due some other reason, e.g. an exception thrown during execution. + +### Normal Execution + +The steps a task goes through when executed until completion without being interrupted are illustrated below: + + setInitialState() + invoke() + Create basic utils (config, etc) and load the chain of operators + setup-operators() + task specific init() + initialize-operator-states() + open-operators() + run() + close-operators() + dispose-operators() + task specific cleanup() + common cleanup + +As shown above, after recovering the task configuration and initializing some important runtime parameters, the very first step for the task is to retrieve its initial, +task-wide state. This is done in the `setInitialState()`, and it is particularly important in two cases: + +1. when the task is recovering from a failure and restarts from the last successful checkpoint, and +2. when resuming from a [savepoint] ({{ site.baseurl }}/setup/savepoints.html). + +If it is the first time the task is executed, the initial task state is empty. + +After recovering any initial state, the task goes into its `invoke()` method. There, it first initializes the operators involved in the local computation by calling +the `setup()` method of each one of them and then performs its task-specific initialization by calling the local `init()` method. By task-specific, we mean that +depending on the type of the task (`SourceTask`, `OneInputStreamTask` or `TwoInputStreamTask`, etc), this step may differ, but in any case, here is where the necessary +task-wide resources are acquired. As an example, the `OneInputStreamTask` which represents a task that expects to have a single input stream, initializes the connection(s) +to the location(s) of the different partitions of the input stream that are relevant to the local task. + +Having acquired the necessary resources, it is time for the different operators and user-defined functions to acquire their individual state from the task-wide state +retrieved above. This is done in the `initializeState()` method, which calls the `initializeState()` of each individual operator. This method should be overriden by +every stateful operator and should contain the state initialization logic, both for the first time a job is executed, but also for the case when the task recovers from +a failure or using a savepoint. + +Now that all operators in the task have been initialized, the `open()` method of each individual operator is called by the `openAllOperators()` method of the `StreamTask`. +This is the place that implements all the operational initialization, such as register any retrieved timers with the timer service. A single task may be executing multiple +operators with one consuming the output of its predecessor. In this case, the `open()` method is called from the last operator, i.e. the one whose output is also the output +of the task itself, to the first. This is done so that when the first operator starts processing the task's input, all downstream operators are ready to receive its output. + +Now the task can resume execution, and operators can start processing fresh input data. This is the place where the task-specific `run()` mehtod is called. This method will +run until either there is no more input data (finite stream), or the task is cancelled (manually or not). Here is where the operator specific `processElement()` and +`processWatermark()` methods are called. + +In the case of running till completion, i.e. there is no more input data to process, after exiting from the `run()` method, the task enters its shutdown process. +Initially, the timer service stops registering any new timers (e.g. from fired timers that are being executed), clears all the not yet started timers, and awaits the +completion of currently executing ones. Then the `closeAllOperators()` tries to gracefully close the operators involved in the computation by calling the `close()` method +of each one of them. Then any buffered output data is flushed so that they can be processed by the downstream tasks, and finally the task tries to clear all the resources +held by the operators by calling the `dispose()` method of each one of them. When opening the different operators, we mentioned that the order is from the last to the first. +Closing happens the other way round, from first to last. + +Finally, when all operators have been closed and all their resources freed, the task shuts down its timer service, performs its task-specific cleanup, e.g. clean all its +internal buffers, and then performs its generic task clean up which consists of closing all its output channels and cleaning any output buffers. + +* Checkpoints: * Previously we saw that during `initializeState()`, and in case of recovering from a failure, the task and all its operators and functions retrieve the +state that was persisted to stable storage during the last successful checkpoint before the failure. Checkpoints in Flink are performed periodically based on a user-specified +interval, and are performed by a different thread than that of the main task thread. This is the reason the are not included in the main phases of the task lifecycle. In a +nutshell, special elements called `CheckpointBarriers` are injected periodically by the source tasks of a job in the stream of input data, and travel with the actual data from +source to sink. A source task injects these barriers after it is in running mode, and assuming that the CheckpointCoordinator is also running. Whenever a task receives such a — End diff – CheckpointCoordinator -> `CheckpointCoordinator`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r103482008

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,149 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines
        +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in
        +the order that they are called. These are:
        +
        + // initialization
        + initializeState()
        + setup()
        + open()
        +
        + // processing
        + processElement()
        + processWatermark()
        +
        + // termination
        + close()
        + dispose()
        +
        + // checkpointing
        + snapshotState()
        +
        +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific
        +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as
        +opening the user-defined function in the case of the `AbstractUdfStreamOperator`.
        +
        +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which
        — End diff –

        remove "fresh"?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r103482008 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,149 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. This is the place where each parallel instance of your operators gets executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +As the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. Given this, it is worth spending a few lines +simply mentioning the basic methods representing the lifecycle of an operator before diving into those of the `StreamTask` itself. The list is presented below in +the order that they are called. These are: + + // initialization + initializeState() + setup() + open() + + // processing + processElement() + processWatermark() + + // termination + close() + dispose() + + // checkpointing + snapshotState() + +In a nutshell, the `initializeState()` gives an operator its initial state. After obtaining it, the `setup()` is called to initialize some operator specific +machinery, such as its `RuntimContext` and its metric collection data-structures, and the `open()` executes any operator-specific initialization, such as +opening the user-defined function in the case of the `AbstractUdfStreamOperator`. + +Now that everything is set, the operator is ready to process fresh incoming data. This is done by invoking the `processElement()` and `processWatermark()` methods which — End diff – remove "fresh"?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104126670

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the
        +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator
        +can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in
        +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`,
        +which is the basic class for all operators that execute UDFs.
        +
        + // initialization
        + OPERATOR::setup
        + UDF::setRuntimeContext
        + OPERATOR::initializeState
        + OPERATOR::open
        + UDF::open
        +
        + // processing
        + OPERATOR::processElement
        — End diff –

        Maybe we could somehow indicate that those functions are not called once, but in loops. Thereby, the operator might call methods on the user function. `run()` is just one possible method, if the UDF is a source function.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104126670 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator +can have a user-defined function ( UDF ), below each of the operator methods we also present (indented) the methods in +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`, +which is the basic class for all operators that execute UDFs. + + // initialization + OPERATOR::setup + UDF::setRuntimeContext + OPERATOR::initializeState + OPERATOR::open + UDF::open + + // processing + OPERATOR::processElement — End diff – Maybe we could somehow indicate that those functions are not called once, but in loops. Thereby, the operator might call methods on the user function. `run()` is just one possible method, if the UDF is a source function.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104126746

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the
        +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator
        +can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in
        +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`,
        +which is the basic class for all operators that execute UDFs.
        +
        + // initialization
        + OPERATOR::setup
        + UDF::setRuntimeContext
        + OPERATOR::initializeState
        + OPERATOR::open
        + UDF::open
        +
        + // processing
        + OPERATOR::processElement
        + UDF::run
        + OPERATOR::processWatermark
        +
        + // checkpointing (called asynchronously)
        + OPERATOR::snapshotState
        — End diff –

        Again, this could be called multiple times in the lifecycle. Once for each triggered checkpoint.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104126746 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator +can have a user-defined function ( UDF ), below each of the operator methods we also present (indented) the methods in +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`, +which is the basic class for all operators that execute UDFs. + + // initialization + OPERATOR::setup + UDF::setRuntimeContext + OPERATOR::initializeState + OPERATOR::open + UDF::open + + // processing + OPERATOR::processElement + UDF::run + OPERATOR::processWatermark + + // checkpointing (called asynchronously) + OPERATOR::snapshotState — End diff – Again, this could be called multiple times in the lifecycle. Once for each triggered checkpoint.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104128750

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the
        +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator
        +can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in
        +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`,
        +which is the basic class for all operators that execute UDFs.
        +
        + // initialization
        + OPERATOR::setup
        + UDF::setRuntimeContext
        + OPERATOR::initializeState
        + OPERATOR::open
        + UDF::open
        +
        + // processing
        + OPERATOR::processElement
        + UDF::run
        + OPERATOR::processWatermark
        +
        + // checkpointing (called asynchronously)
        + OPERATOR::snapshotState
        +
        + // termination
        + OPERATOR::close
        + UDF::close
        + OPERATOR::dispose
        +
        +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and
        +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the
        + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of
        +the `AbstractUdfStreamOperator`.
        +
        +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the
        +state of the operator during its initial execution (e.g. register any keyed state), and also the logic to retrieve its
        +state from a checkpoint after a failure. More about this on the rest of this page.
        +
        +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()`
        +and `processWatermark()` methods which contain the logic for processing elements and watermarks, respectively. The
        +`processElement()` is also the place where you function's logic is invoked, e.g. the `map()` method of your `MapFunction`.
        +
        +Finally, in the case of a normal, fault-free termination of the operator (e.g. if the stream is finite and its end is
        +reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic, and
        +the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`
        +and skips any intermediate phases between the phase the operator was in when the failure happened and the `dispose()`.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described
        +above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, i.e. after the
        +operator is opened and before it is closed. The responsibility of this method is to store the current state of the operator
        +to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it will be retrieved when
        +the job resumes execution after a failure. Below we include a brief description of Flink's checkpointing mechanism,
        +and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation:
        +[Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
        +
        +## Task Lifecycle
        +
        +Following that brief introduction on the operator's main phases, this section describes in more detail how a task calls
        +the respective methods during its execution on a cluster. The sequence of the phases described here is mainly included
        +in the `invoke()` method of the `StreamTask` class. The remainder of this document is split into two subsections, one
        +describing the phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)), and
        +(a shorter) one describing the different sequence followed in case the task is cancelled (see [Interrupted Execution](#interrupted-execution)),
        +either manually, or due some other reason, e.g. an exception thrown during execution.
        +
        +### Normal Execution
        +
        +The steps a task goes through when executed until completion without being interrupted are illustrated below:
        +
        + TASK::setInitialState
        — End diff –

        Again, I suggest to always indicate already in the overview (and more so in the detailed description) which methods are just called once, and which are looped (and maybe what is there terminal condition)

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104128750 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator +can have a user-defined function ( UDF ), below each of the operator methods we also present (indented) the methods in +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`, +which is the basic class for all operators that execute UDFs. + + // initialization + OPERATOR::setup + UDF::setRuntimeContext + OPERATOR::initializeState + OPERATOR::open + UDF::open + + // processing + OPERATOR::processElement + UDF::run + OPERATOR::processWatermark + + // checkpointing (called asynchronously) + OPERATOR::snapshotState + + // termination + OPERATOR::close + UDF::close + OPERATOR::dispose + +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of +the `AbstractUdfStreamOperator`. + +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the +state of the operator during its initial execution ( e.g. register any keyed state), and also the logic to retrieve its +state from a checkpoint after a failure. More about this on the rest of this page. + +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()` +and `processWatermark()` methods which contain the logic for processing elements and watermarks, respectively. The +`processElement()` is also the place where you function's logic is invoked, e.g. the `map()` method of your `MapFunction`. + +Finally, in the case of a normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is +reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic, and +the `dispose()` is called after that to free any resources held by the operator. + +In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()` +and skips any intermediate phases between the phase the operator was in when the failure happened and the `dispose()`. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described +above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, i.e. after the +operator is opened and before it is closed. The responsibility of this method is to store the current state of the operator +to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it will be retrieved when +the job resumes execution after a failure. Below we include a brief description of Flink's checkpointing mechanism, +and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation: + [Data Streaming Fault Tolerance] ({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Following that brief introduction on the operator's main phases, this section describes in more detail how a task calls +the respective methods during its execution on a cluster. The sequence of the phases described here is mainly included +in the `invoke()` method of the `StreamTask` class. The remainder of this document is split into two subsections, one +describing the phases during a regular, fault-free execution of a task (see [Normal Execution] (#normal-execution)), and +(a shorter) one describing the different sequence followed in case the task is cancelled (see [Interrupted Execution] (#interrupted-execution)), +either manually, or due some other reason, e.g. an exception thrown during execution. + +### Normal Execution + +The steps a task goes through when executed until completion without being interrupted are illustrated below: + + TASK::setInitialState — End diff – Again, I suggest to always indicate already in the overview (and more so in the detailed description) which methods are just called once, and which are looped (and maybe what is there terminal condition)
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104126990

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the
        +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator
        +can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in
        +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`,
        +which is the basic class for all operators that execute UDFs.
        +
        + // initialization
        + OPERATOR::setup
        + UDF::setRuntimeContext
        + OPERATOR::initializeState
        + OPERATOR::open
        + UDF::open
        +
        + // processing
        + OPERATOR::processElement
        + UDF::run
        + OPERATOR::processWatermark
        +
        + // checkpointing (called asynchronously)
        + OPERATOR::snapshotState
        +
        + // termination
        + OPERATOR::close
        + UDF::close
        + OPERATOR::dispose
        +
        +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and
        +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the
        + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of
        +the `AbstractUdfStreamOperator`.
        +
        +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the
        +state of the operator during its initial execution (e.g. register any keyed state), and also the logic to retrieve its
        +state from a checkpoint after a failure. More about this on the rest of this page.
        +
        +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()`
        — End diff –

        At least here, we should say that this is happening as a loop, until some condition causes a break. Otherwise, there is the misleading interpretation that this methods are called once and the looping happens inside of them.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104126990 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator +can have a user-defined function ( UDF ), below each of the operator methods we also present (indented) the methods in +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`, +which is the basic class for all operators that execute UDFs. + + // initialization + OPERATOR::setup + UDF::setRuntimeContext + OPERATOR::initializeState + OPERATOR::open + UDF::open + + // processing + OPERATOR::processElement + UDF::run + OPERATOR::processWatermark + + // checkpointing (called asynchronously) + OPERATOR::snapshotState + + // termination + OPERATOR::close + UDF::close + OPERATOR::dispose + +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of +the `AbstractUdfStreamOperator`. + +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the +state of the operator during its initial execution ( e.g. register any keyed state), and also the logic to retrieve its +state from a checkpoint after a failure. More about this on the rest of this page. + +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()` — End diff – At least here, we should say that this is happening as a loop, until some condition causes a break. Otherwise, there is the misleading interpretation that this methods are called once and the looping happens inside of them.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104125117

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        — End diff –

        You could start "`StreamTask` is the basis...". And after that start about going through its different phases.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104125117 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different — End diff – You could start "`StreamTask` is the basis...". And after that start about going through its different phases.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104126474

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the
        +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator
        +can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in
        +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`,
        +which is the basic class for all operators that execute UDFs.
        +
        + // initialization
        + OPERATOR::setup
        + UDF::setRuntimeContext
        + OPERATOR::initializeState
        + OPERATOR::open
        + UDF::open
        +
        + // processing
        + OPERATOR::processElement
        + UDF::run
        — End diff –

        `UDF::run` is actually just a special case for source functions. Other functions could have methods like `apply` that are called in the operator's main loop.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104126474 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator +can have a user-defined function ( UDF ), below each of the operator methods we also present (indented) the methods in +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`, +which is the basic class for all operators that execute UDFs. + + // initialization + OPERATOR::setup + UDF::setRuntimeContext + OPERATOR::initializeState + OPERATOR::open + UDF::open + + // processing + OPERATOR::processElement + UDF::run — End diff – `UDF::run` is actually just a special case for source functions. Other functions could have methods like `apply` that are called in the operator's main loop.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104128356

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the
        +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator
        +can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in
        +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`,
        +which is the basic class for all operators that execute UDFs.
        +
        + // initialization
        + OPERATOR::setup
        + UDF::setRuntimeContext
        + OPERATOR::initializeState
        + OPERATOR::open
        + UDF::open
        +
        + // processing
        + OPERATOR::processElement
        + UDF::run
        + OPERATOR::processWatermark
        +
        + // checkpointing (called asynchronously)
        + OPERATOR::snapshotState
        +
        + // termination
        + OPERATOR::close
        + UDF::close
        + OPERATOR::dispose
        +
        +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and
        +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the
        + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of
        +the `AbstractUdfStreamOperator`.
        +
        +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the
        +state of the operator during its initial execution (e.g. register any keyed state), and also the logic to retrieve its
        +state from a checkpoint after a failure. More about this on the rest of this page.
        +
        +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()`
        +and `processWatermark()` methods which contain the logic for processing elements and watermarks, respectively. The
        +`processElement()` is also the place where you function's logic is invoked, e.g. the `map()` method of your `MapFunction`.
        +
        +Finally, in the case of a normal, fault-free termination of the operator (e.g. if the stream is finite and its end is
        +reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic, and
        — End diff –

        We could give examples to make the difference of `close` and `dispose`easier to understand. In particular, as the explanation is not 100% true. I suggest to say that e.g. `close` could close some streams (which could also be seen as releasing resources) or shutdown the operator's reactions on input. In a broader sense, it stops the operator from running. In contrast to that dispose could release native memory that held the operator's internal data. In a broader sense, it releases all resources that are still around after stopping the operator.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104128356 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator +can have a user-defined function ( UDF ), below each of the operator methods we also present (indented) the methods in +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`, +which is the basic class for all operators that execute UDFs. + + // initialization + OPERATOR::setup + UDF::setRuntimeContext + OPERATOR::initializeState + OPERATOR::open + UDF::open + + // processing + OPERATOR::processElement + UDF::run + OPERATOR::processWatermark + + // checkpointing (called asynchronously) + OPERATOR::snapshotState + + // termination + OPERATOR::close + UDF::close + OPERATOR::dispose + +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of +the `AbstractUdfStreamOperator`. + +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the +state of the operator during its initial execution ( e.g. register any keyed state), and also the logic to retrieve its +state from a checkpoint after a failure. More about this on the rest of this page. + +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()` +and `processWatermark()` methods which contain the logic for processing elements and watermarks, respectively. The +`processElement()` is also the place where you function's logic is invoked, e.g. the `map()` method of your `MapFunction`. + +Finally, in the case of a normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is +reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic, and — End diff – We could give examples to make the difference of `close` and `dispose`easier to understand. In particular, as the explanation is not 100% true. I suggest to say that e.g. `close` could close some streams (which could also be seen as releasing resources) or shutdown the operator's reactions on input. In a broader sense, it stops the operator from running. In contrast to that dispose could release native memory that held the operator's internal data. In a broader sense, it releases all resources that are still around after stopping the operator.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104127245

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the
        +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator
        +can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in
        +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`,
        +which is the basic class for all operators that execute UDFs.
        +
        + // initialization
        — End diff –

        I would suggest to add a `phase` after `initialization`, `processing`, etc.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104127245 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator +can have a user-defined function ( UDF ), below each of the operator methods we also present (indented) the methods in +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`, +which is the basic class for all operators that execute UDFs. + + // initialization — End diff – I would suggest to add a `phase` after `initialization`, `processing`, etc.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104128587

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the
        +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator
        +can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in
        +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`,
        +which is the basic class for all operators that execute UDFs.
        +
        + // initialization
        + OPERATOR::setup
        + UDF::setRuntimeContext
        + OPERATOR::initializeState
        + OPERATOR::open
        + UDF::open
        +
        + // processing
        + OPERATOR::processElement
        + UDF::run
        + OPERATOR::processWatermark
        +
        + // checkpointing (called asynchronously)
        + OPERATOR::snapshotState
        +
        + // termination
        + OPERATOR::close
        + UDF::close
        + OPERATOR::dispose
        +
        +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and
        +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the
        + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of
        +the `AbstractUdfStreamOperator`.
        +
        +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the
        +state of the operator during its initial execution (e.g. register any keyed state), and also the logic to retrieve its
        +state from a checkpoint after a failure. More about this on the rest of this page.
        +
        +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()`
        +and `processWatermark()` methods which contain the logic for processing elements and watermarks, respectively. The
        +`processElement()` is also the place where you function's logic is invoked, e.g. the `map()` method of your `MapFunction`.
        +
        +Finally, in the case of a normal, fault-free termination of the operator (e.g. if the stream is finite and its end is
        +reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic, and
        +the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`
        — End diff –

        This is also why failure go immediately to `dispose`: the operator is already stopped (because of the failure), but the resources are not yet all released.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104128587 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator +can have a user-defined function ( UDF ), below each of the operator methods we also present (indented) the methods in +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`, +which is the basic class for all operators that execute UDFs. + + // initialization + OPERATOR::setup + UDF::setRuntimeContext + OPERATOR::initializeState + OPERATOR::open + UDF::open + + // processing + OPERATOR::processElement + UDF::run + OPERATOR::processWatermark + + // checkpointing (called asynchronously) + OPERATOR::snapshotState + + // termination + OPERATOR::close + UDF::close + OPERATOR::dispose + +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of +the `AbstractUdfStreamOperator`. + +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the +state of the operator during its initial execution ( e.g. register any keyed state), and also the logic to retrieve its +state from a checkpoint after a failure. More about this on the rest of this page. + +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()` +and `processWatermark()` methods which contain the logic for processing elements and watermarks, respectively. The +`processElement()` is also the place where you function's logic is invoked, e.g. the `map()` method of your `MapFunction`. + +Finally, in the case of a normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is +reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic, and +the `dispose()` is called after that to free any resources held by the operator. + +In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()` — End diff – This is also why failure go immediately to `dispose`: the operator is already stopped (because of the failure), but the resources are not yet all released.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104124935

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        — End diff –

        "A task ... is the place where each parallel instance...": one could interpret that all parallel instances run in one task, which is misleading.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104124935 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. — End diff – "A task ... is the place where each parallel instance...": one could interpret that all parallel instances run in one task, which is misleading.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StefanRRichter commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104125316

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        — End diff –

        I think it is more precisely to say "...that executes a parallel instance of an operator"

        Show
        githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104125316 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. — End diff – I think it is more precisely to say "...that executes a parallel instance of an operator"
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3429#discussion_r104136196

        — Diff: docs/internals/task_lifecycle.md —
        @@ -0,0 +1,186 @@
        +---
        +title: "Task Lifecycle"
        +nav-title: Task Lifecycle
        +nav-parent_id: internals
        +nav-pos: 5
        +---
        +<!--
        +Licensed to the Apache Software Foundation (ASF) under one
        +or more contributor license agreements. See the NOTICE file
        +distributed with this work for additional information
        +regarding copyright ownership. The ASF licenses this file
        +to you under the Apache License, Version 2.0 (the
        +"License"); you may not use this file except in compliance
        +with the License. You may obtain a copy of the License at
        +
        + http://www.apache.org/licenses/LICENSE-2.0
        +
        +Unless required by applicable law or agreed to in writing,
        +software distributed under the License is distributed on an
        +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        +KIND, either express or implied. See the License for the
        +specific language governing permissions and limitations
        +under the License.
        +-->
        +
        +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed.
        +
        +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different
        +task sub-types in Flink's streaming engine.
        +
        +* This will be replaced by the TOC
        +{:toc}
        +
        +## Operator Lifecycle in a nutshell
        +
        +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator.
        +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the
        +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator
        +can have a user-defined function (UDF), below each of the operator methods we also present (indented) the methods in
        +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`,
        +which is the basic class for all operators that execute UDFs.
        +
        + // initialization
        + OPERATOR::setup
        + UDF::setRuntimeContext
        + OPERATOR::initializeState
        + OPERATOR::open
        + UDF::open
        +
        + // processing
        + OPERATOR::processElement
        + UDF::run
        + OPERATOR::processWatermark
        +
        + // checkpointing (called asynchronously)
        + OPERATOR::snapshotState
        +
        + // termination
        + OPERATOR::close
        + UDF::close
        + OPERATOR::dispose
        +
        +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and
        +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the
        + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of
        +the `AbstractUdfStreamOperator`.
        +
        +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the
        +state of the operator during its initial execution (e.g. register any keyed state), and also the logic to retrieve its
        +state from a checkpoint after a failure. More about this on the rest of this page.
        +
        +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()`
        +and `processWatermark()` methods which contain the logic for processing elements and watermarks, respectively. The
        +`processElement()` is also the place where you function's logic is invoked, e.g. the `map()` method of your `MapFunction`.
        +
        +Finally, in the case of a normal, fault-free termination of the operator (e.g. if the stream is finite and its end is
        +reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic, and
        +the `dispose()` is called after that to free any resources held by the operator.
        +
        +In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()`
        +and skips any intermediate phases between the phase the operator was in when the failure happened and the `dispose()`.
        +
        +*Checkpoints:* The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described
        +above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, i.e. after the
        +operator is opened and before it is closed. The responsibility of this method is to store the current state of the operator
        +to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it will be retrieved when
        +the job resumes execution after a failure. Below we include a brief description of Flink's checkpointing mechanism,
        +and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation:
        +[Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
        +
        +## Task Lifecycle
        +
        +Following that brief introduction on the operator's main phases, this section describes in more detail how a task calls
        +the respective methods during its execution on a cluster. The sequence of the phases described here is mainly included
        +in the `invoke()` method of the `StreamTask` class. The remainder of this document is split into two subsections, one
        +describing the phases during a regular, fault-free execution of a task (see [Normal Execution](#normal-execution)), and
        +(a shorter) one describing the different sequence followed in case the task is cancelled (see [Interrupted Execution](#interrupted-execution)),
        +either manually, or due some other reason, e.g. an exception thrown during execution.
        +
        +### Normal Execution
        +
        +The steps a task goes through when executed until completion without being interrupted are illustrated below:
        +
        + TASK::setInitialState
        — End diff –

        In the task, all the described methods are called once.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3429#discussion_r104136196 — Diff: docs/internals/task_lifecycle.md — @@ -0,0 +1,186 @@ +--- +title: "Task Lifecycle" +nav-title: Task Lifecycle +nav-parent_id: internals +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A task in Flink is the basic unit of execution. It is the place where each parallel instance of an operator is executed. + +This document goes through the different phases in the lifecycle of the `StreamTask`, which is the base for all different +task sub-types in Flink's streaming engine. + +* This will be replaced by the TOC +{:toc} + +## Operator Lifecycle in a nutshell + +Because the task is the entity that executes your operators, its lifecycle is tightly integrated with that of an operator. +So, we will briefly mention the basic methods representing the lifecycle of an operator before diving into those of the +`StreamTask` itself. The list is presented below in the order that each of the methods is called. Given that an operator +can have a user-defined function ( UDF ), below each of the operator methods we also present (indented) the methods in +the lifecycle of UDF that it calls. These methods are available if your operator extends the `AbstractUdfStreamOperator`, +which is the basic class for all operators that execute UDFs. + + // initialization + OPERATOR::setup + UDF::setRuntimeContext + OPERATOR::initializeState + OPERATOR::open + UDF::open + + // processing + OPERATOR::processElement + UDF::run + OPERATOR::processWatermark + + // checkpointing (called asynchronously) + OPERATOR::snapshotState + + // termination + OPERATOR::close + UDF::close + OPERATOR::dispose + +In a nutshell, the `setup()` is called to initialize some operator-specific machinery, such as its `RuntimeContext` and +its metric collection data-structures. After this, the `initializeState()` gives an operator its initial state, and the + `open()` method executes any operator-specific initialization, such as opening the user-defined function in the case of +the `AbstractUdfStreamOperator`. + +<span class="label label-danger">Attention</span> The `initializeState()` contains both the logic for initializing the +state of the operator during its initial execution ( e.g. register any keyed state), and also the logic to retrieve its +state from a checkpoint after a failure. More about this on the rest of this page. + +Now that everything is set, the operator is ready to process incoming data. This is done by invoking the `processElement()` +and `processWatermark()` methods which contain the logic for processing elements and watermarks, respectively. The +`processElement()` is also the place where you function's logic is invoked, e.g. the `map()` method of your `MapFunction`. + +Finally, in the case of a normal, fault-free termination of the operator ( e.g. if the stream is finite and its end is +reached), the `close()` method is called to perform any final bookkeeping action required by the operator's logic, and +the `dispose()` is called after that to free any resources held by the operator. + +In the case of a termination due to a failure or due to manual cancellation, the execution jumps directly to the `dispose()` +and skips any intermediate phases between the phase the operator was in when the failure happened and the `dispose()`. + +* Checkpoints: * The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described +above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, i.e. after the +operator is opened and before it is closed. The responsibility of this method is to store the current state of the operator +to the specified [state backend] ({{ site.baseurl }}/ops/state_backends.html) from where it will be retrieved when +the job resumes execution after a failure. Below we include a brief description of Flink's checkpointing mechanism, +and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation: + [Data Streaming Fault Tolerance] ({{ site.baseurl }}/internals/stream_checkpointing.html). + +## Task Lifecycle + +Following that brief introduction on the operator's main phases, this section describes in more detail how a task calls +the respective methods during its execution on a cluster. The sequence of the phases described here is mainly included +in the `invoke()` method of the `StreamTask` class. The remainder of this document is split into two subsections, one +describing the phases during a regular, fault-free execution of a task (see [Normal Execution] (#normal-execution)), and +(a shorter) one describing the different sequence followed in case the task is cancelled (see [Interrupted Execution] (#interrupted-execution)), +either manually, or due some other reason, e.g. an exception thrown during execution. + +### Normal Execution + +The steps a task goes through when executed until completion without being interrupted are illustrated below: + + TASK::setInitialState — End diff – In the task, all the described methods are called once.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on the issue:

        https://github.com/apache/flink/pull/3429

        Thanks a lot for the review @StefanRRichter . I will integrate them and merge.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3429 Thanks a lot for the review @StefanRRichter . I will integrate them and merge.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/flink/pull/3429

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3429
        Hide
        kkl0u Kostas Kloudas added a comment -

        Merged at a2e6fb06c404a31294c53903fbaeaac666403dc8

        Show
        kkl0u Kostas Kloudas added a comment - Merged at a2e6fb06c404a31294c53903fbaeaac666403dc8

          People

          • Assignee:
            kkl0u Kostas Kloudas
            Reporter:
            kkl0u Kostas Kloudas
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development