Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5183

[py] Support multiple jobs per Python plan file

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.1.3
    • Fix Version/s: 1.3.0
    • Component/s: Python API
    • Labels:
      None

      Description

      Support running multiple jobs per Python plan file.

        Issue Links

          Activity

          Hide
          hydronium Geoffrey Mon added a comment - - edited

          I'm currently working on this at https://github.com/geofbot/flink/tree/FLINK-5183. So far I've managed to get it to work except for a few issues:

          • Execution environments need to be able to get a unique ID that can be used to identify it between Java and Python. At the moment, these IDs are manually assigned.
            • I've achieved this using global variables in the Environment module of the Python API.
          • PythonPlanBinder does not exit when Python process exits, but instead waits indefinitely for more jobs
            • Although there is a check in a while loop to continue waiting for more Python jobs until the Python process ends, there is a race condition where the Python process will usually end right after Java checks if it is running (unless you pause it in a debugger, giving time for Python to exit). This causes Java to expect more jobs and wait indefinitely.
          • Global variables used to differentiate between execution environments and to run operators in Python for skipping over execution environments that do not contain the operator to be run (there should be a better solution)
          Show
          hydronium Geoffrey Mon added a comment - - edited I'm currently working on this at https://github.com/geofbot/flink/tree/FLINK-5183 . So far I've managed to get it to work except for a few issues: Execution environments need to be able to get a unique ID that can be used to identify it between Java and Python. At the moment, these IDs are manually assigned. I've achieved this using global variables in the Environment module of the Python API. PythonPlanBinder does not exit when Python process exits, but instead waits indefinitely for more jobs Although there is a check in a while loop to continue waiting for more Python jobs until the Python process ends, there is a race condition where the Python process will usually end right after Java checks if it is running (unless you pause it in a debugger, giving time for Python to exit). This causes Java to expect more jobs and wait indefinitely. Global variables used to differentiate between execution environments and to run operators in Python for skipping over execution environments that do not contain the operator to be run (there should be a better solution)
          Hide
          Zentol Chesnay Schepler added a comment -

          Awesome stuff. Could you not fix the second issue by specifying a 5 second timeout on the ServerSocket?

          Show
          Zentol Chesnay Schepler added a comment - Awesome stuff. Could you not fix the second issue by specifying a 5 second timeout on the ServerSocket?
          Hide
          hydronium Geoffrey Mon added a comment -

          That solved the problem. I was really overthinking it and I was actually planning on spawning another thread to check if the Python process had died and closing the PythonPlanStreamer ServerSocket using that thread. Once I add a simple unit test and make sure that the feature works as expected, I'll create a pull request so the actual quality of code can be reviewed.

          Show
          hydronium Geoffrey Mon added a comment - That solved the problem. I was really overthinking it and I was actually planning on spawning another thread to check if the Python process had died and closing the PythonPlanStreamer ServerSocket using that thread. Once I add a simple unit test and make sure that the feature works as expected, I'll create a pull request so the actual quality of code can be reviewed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user GEOFBOT opened a pull request:

          https://github.com/apache/flink/pull/3232

          FLINK-5183 [py] Support mulitple jobs per plan file

          Modifies the Python API to support running multiple jobs in one Python plan file.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/GEOFBOT/flink FLINK-5183

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3232.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3232


          commit e2f4a4acda8c180e69a72709dfd777786ac6650b
          Author: Geoffrey Mon <geofbot@gmail.com>
          Date: 2017-01-26T14:15:55Z

          FLINK-5183 [py] Support mulitple jobs per plan file

          Issues to be resolved:

          • Execution environments need to be able to get a unique ID that can be
            used to identify it between languages. At the moment, these IDs are
            manually assigned.
          • PythonPlanBinder does not exit when Python process exits, waiting
            indefinitely for more jobs
          • Global variables used to run operators in Python (there should be a
            better solution)

          commit ee500403b62ebf9c8b923d5940aefb2f928a5c13
          Author: Geoffrey Mon <geofbot@gmail.com>
          Date: 2017-01-26T14:27:47Z

          Remove debug print statements

          commit afdeddc03f9817a74927a727733ef5c0025af0c7
          Author: Geoffrey Mon <geofbot@gmail.com>
          Date: 2017-01-26T17:04:23Z

          Auto assign environment IDs; rm Python alive checks that didn't help

          commit 5dac565d89a27cc113cd4abce7fbdd57485e4401
          Author: Geoffrey Mon <geofbot@gmail.com>
          Date: 2017-01-26T21:15:15Z

          Set 5 second timeout for PythonPlanStreamer socket

          After the timeout, we check to see if the Python process has exited or
          not. This allows PythonPlanBinder to exit properly when the Python
          process has exited.

          commit e34522e54921989db6ab37ce3b134672b6980f67
          Author: Geoffrey Mon <geofbot@gmail.com>
          Date: 2017-01-26T22:26:03Z

          Multiple jobs per Python plan file unit test

          commit 64c013f8a9942b05f1b3000149a66e34b7b366b1
          Author: Geoffrey Mon <geofbot@gmail.com>
          Date: 2017-01-27T14:05:25Z

          Fix support for operators using multiple data sets

          commit 782f69ac1d17f8ff5812216c69fa4cfd119d5a60
          Author: Geoffrey Mon <geofbot@gmail.com>
          Date: 2017-01-29T06:02:36Z

          Create Python ExecutionEnvironment after checking if Python has job

          This prevents ExecutionEnvironment#getLastJobExecutionResult from being
          null.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user GEOFBOT opened a pull request: https://github.com/apache/flink/pull/3232 FLINK-5183 [py] Support mulitple jobs per plan file Modifies the Python API to support running multiple jobs in one Python plan file. You can merge this pull request into a Git repository by running: $ git pull https://github.com/GEOFBOT/flink FLINK-5183 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3232.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3232 commit e2f4a4acda8c180e69a72709dfd777786ac6650b Author: Geoffrey Mon <geofbot@gmail.com> Date: 2017-01-26T14:15:55Z FLINK-5183 [py] Support mulitple jobs per plan file Issues to be resolved: Execution environments need to be able to get a unique ID that can be used to identify it between languages. At the moment, these IDs are manually assigned. PythonPlanBinder does not exit when Python process exits, waiting indefinitely for more jobs Global variables used to run operators in Python (there should be a better solution) commit ee500403b62ebf9c8b923d5940aefb2f928a5c13 Author: Geoffrey Mon <geofbot@gmail.com> Date: 2017-01-26T14:27:47Z Remove debug print statements commit afdeddc03f9817a74927a727733ef5c0025af0c7 Author: Geoffrey Mon <geofbot@gmail.com> Date: 2017-01-26T17:04:23Z Auto assign environment IDs; rm Python alive checks that didn't help commit 5dac565d89a27cc113cd4abce7fbdd57485e4401 Author: Geoffrey Mon <geofbot@gmail.com> Date: 2017-01-26T21:15:15Z Set 5 second timeout for PythonPlanStreamer socket After the timeout, we check to see if the Python process has exited or not. This allows PythonPlanBinder to exit properly when the Python process has exited. commit e34522e54921989db6ab37ce3b134672b6980f67 Author: Geoffrey Mon <geofbot@gmail.com> Date: 2017-01-26T22:26:03Z Multiple jobs per Python plan file unit test commit 64c013f8a9942b05f1b3000149a66e34b7b366b1 Author: Geoffrey Mon <geofbot@gmail.com> Date: 2017-01-27T14:05:25Z Fix support for operators using multiple data sets commit 782f69ac1d17f8ff5812216c69fa4cfd119d5a60 Author: Geoffrey Mon <geofbot@gmail.com> Date: 2017-01-29T06:02:36Z Create Python ExecutionEnvironment after checking if Python has job This prevents ExecutionEnvironment#getLastJobExecutionResult from being null.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98443208

          — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java —
          @@ -32,7 +32,7 @@
          private final PythonStreamer streamer;
          private transient final TypeInformation<OUT> typeInformation;

          • public PythonMapPartition(int id, TypeInformation<OUT> typeInformation) {
            + public PythonMapPartition(String id, TypeInformation<OUT> typeInformation) {
              • End diff –

          i would propose explicitly passing the envID and setID. Makes it a lot clearer, and we don't have to split a string that we created a second ago.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98443208 — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java — @@ -32,7 +32,7 @@ private final PythonStreamer streamer; private transient final TypeInformation<OUT> typeInformation; public PythonMapPartition(int id, TypeInformation<OUT> typeInformation) { + public PythonMapPartition(String id, TypeInformation<OUT> typeInformation) { End diff – i would propose explicitly passing the envID and setID. Makes it a lot clearer, and we don't have to split a string that we created a second ago.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98441018

          — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java —
          @@ -139,15 +139,28 @@ private void runPlan(String[] args) throws Exception {
          String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt();
          prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split));
          startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));

          • receivePlan();
          • if (env instanceof LocalEnvironment) {
          • FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink";
            + // Python process should terminate itself when all jobs have been run
            + while (streamer.isPythonRunning())
            Unknown macro: { + try { + receivePlan(); + } catch (SocketTimeoutException ste) { + // If the socket times out, check to see if Python process has exited yet + continue; + } + + if (env instanceof LocalEnvironment) { + FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + } + + distributeFiles(tmpPath, env); + JobExecutionResult jer = env.execute(); + sendResult(jer); + + environmentCounter++; }
          • distributeFiles(tmpPath, env);
          • JobExecutionResult jer = env.execute();
          • sendResult(jer);
            + clearPath(tmpPath);
              • End diff –

          why did you move this line here from L203?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98441018 — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java — @@ -139,15 +139,28 @@ private void runPlan(String[] args) throws Exception { String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt(); prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split)); startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); receivePlan(); if (env instanceof LocalEnvironment) { FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + // Python process should terminate itself when all jobs have been run + while (streamer.isPythonRunning()) Unknown macro: { + try { + receivePlan(); + } catch (SocketTimeoutException ste) { + // If the socket times out, check to see if Python process has exited yet + continue; + } + + if (env instanceof LocalEnvironment) { + FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + } + + distributeFiles(tmpPath, env); + JobExecutionResult jer = env.execute(); + sendResult(jer); + + environmentCounter++; } distributeFiles(tmpPath, env); JobExecutionResult jer = env.execute(); sendResult(jer); + clearPath(tmpPath); End diff – why did you move this line here from L203?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98443058

          — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java —
          @@ -92,6 +96,11 @@ public PythonOperationInfo(PythonPlanStreamer streamer) throws IOException

          { values[x] = streamer.getRecord(); }

          parallelism = (Integer) streamer.getRecord(true);
          +
          + envID = environmentID;
          + uniqueID = "" + envID + "." + setID;
          — End diff –

          All these unique ID's appear unnecessary. Could we not simply clear the `sets` `HashMap` in the PythonPlanBinder when an execution is finished?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98443058 — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java — @@ -92,6 +96,11 @@ public PythonOperationInfo(PythonPlanStreamer streamer) throws IOException { values[x] = streamer.getRecord(); } parallelism = (Integer) streamer.getRecord(true); + + envID = environmentID; + uniqueID = "" + envID + "." + setID; — End diff – All these unique ID's appear unnecessary. Could we not simply clear the `sets` `HashMap` in the PythonPlanBinder when an execution is finished?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98445731

          — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py —
          @@ -27,17 +27,24 @@
          import sys
          from struct import pack

          +_env_counter = 0
          +_last_env_id = -1
          +_operating = False
          — End diff –

          We have to simplify these things.

          How about this:

          Create a global Container that contains all created Environments and stores them in a list. It should have a method like `create_execution_environment()` that returns a new Environment with the appropriate counter.

          When `execute()` is called we then do something like this:
          ```
          envID = this.container.fetch_id_of_env_to_execute()
          if this.envID == envID:
          //execute stuff as usual
          else:
          return //fall-through
          ```

          We would then no longer need any global variable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98445731 — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py — @@ -27,17 +27,24 @@ import sys from struct import pack +_env_counter = 0 +_last_env_id = -1 +_operating = False — End diff – We have to simplify these things. How about this: Create a global Container that contains all created Environments and stores them in a list. It should have a method like `create_execution_environment()` that returns a new Environment with the appropriate counter. When `execute()` is called we then do something like this: ``` envID = this.container.fetch_id_of_env_to_execute() if this.envID == envID: //execute stuff as usual else: return //fall-through ``` We would then no longer need any global variable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user GEOFBOT commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98599687

          — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java —
          @@ -139,15 +139,28 @@ private void runPlan(String[] args) throws Exception {
          String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt();
          prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split));
          startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));

          • receivePlan();
          • if (env instanceof LocalEnvironment) {
          • FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink";
            + // Python process should terminate itself when all jobs have been run
            + while (streamer.isPythonRunning())
            Unknown macro: { + try { + receivePlan(); + } catch (SocketTimeoutException ste) { + // If the socket times out, check to see if Python process has exited yet + continue; + } + + if (env instanceof LocalEnvironment) { + FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + } + + distributeFiles(tmpPath, env); + JobExecutionResult jer = env.execute(); + sendResult(jer); + + environmentCounter++; }
          • distributeFiles(tmpPath, env);
          • JobExecutionResult jer = env.execute();
          • sendResult(jer);
            + clearPath(tmpPath);
              • End diff –

          The files necessary for plan mode (Flink python module, plan file, etc.) are copied with `prepareFiles` on L140. If `clearPath` is left at L203 inside of `distributeFiles`, those necessary Python files will be removed after the first job is executed and subsequent jobs can't be run.

          Show
          githubbot ASF GitHub Bot added a comment - Github user GEOFBOT commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98599687 — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java — @@ -139,15 +139,28 @@ private void runPlan(String[] args) throws Exception { String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt(); prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split)); startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); receivePlan(); if (env instanceof LocalEnvironment) { FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + // Python process should terminate itself when all jobs have been run + while (streamer.isPythonRunning()) Unknown macro: { + try { + receivePlan(); + } catch (SocketTimeoutException ste) { + // If the socket times out, check to see if Python process has exited yet + continue; + } + + if (env instanceof LocalEnvironment) { + FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + } + + distributeFiles(tmpPath, env); + JobExecutionResult jer = env.execute(); + sendResult(jer); + + environmentCounter++; } distributeFiles(tmpPath, env); JobExecutionResult jer = env.execute(); sendResult(jer); + clearPath(tmpPath); End diff – The files necessary for plan mode (Flink python module, plan file, etc.) are copied with `prepareFiles` on L140. If `clearPath` is left at L203 inside of `distributeFiles`, those necessary Python files will be removed after the first job is executed and subsequent jobs can't be run.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98628345

          — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java —
          @@ -93,8 +95,8 @@ public void open() throws IOException {
          }

          private void startPython() throws IOException {

          • this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
          • this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
            + this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "." + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
              • End diff –

          let's use `_` instead of `.`

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98628345 — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java — @@ -93,8 +95,8 @@ public void open() throws IOException { } private void startPython() throws IOException { this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input"; + this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "." + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; End diff – let's use `_` instead of `.`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98631632

          — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java —
          @@ -93,8 +95,8 @@ public void open() throws IOException {
          }

          private void startPython() throws IOException {

          • this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
          • this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
            + this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "." + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
            + this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "." + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
              • End diff –

          same as above

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98631632 — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java — @@ -93,8 +95,8 @@ public void open() throws IOException { } private void startPython() throws IOException { this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input"; + this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "." + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; + this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "." + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input"; End diff – same as above
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98628019

          — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java —
          @@ -139,15 +139,28 @@ private void runPlan(String[] args) throws Exception {
          String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt();
          prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split));
          startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));

          • receivePlan();
          • if (env instanceof LocalEnvironment) {
          • FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink";
            + // Python process should terminate itself when all jobs have been run
            + while (streamer.isPythonRunning())
            Unknown macro: { + try { + receivePlan(); + } catch (SocketTimeoutException ste) { + // If the socket times out, check to see if Python process has exited yet + continue; + } + + if (env instanceof LocalEnvironment) { + FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + } + + distributeFiles(tmpPath, env); + JobExecutionResult jer = env.execute(); + sendResult(jer); + + environmentCounter++; }
          • distributeFiles(tmpPath, env);
          • JobExecutionResult jer = env.execute();
          • sendResult(jer);
            + clearPath(tmpPath);
              • End diff –

          ah, yes that makes sense.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98628019 — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java — @@ -139,15 +139,28 @@ private void runPlan(String[] args) throws Exception { String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt(); prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split)); startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); receivePlan(); if (env instanceof LocalEnvironment) { FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + // Python process should terminate itself when all jobs have been run + while (streamer.isPythonRunning()) Unknown macro: { + try { + receivePlan(); + } catch (SocketTimeoutException ste) { + // If the socket times out, check to see if Python process has exited yet + continue; + } + + if (env instanceof LocalEnvironment) { + FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + } + + distributeFiles(tmpPath, env); + JobExecutionResult jer = env.execute(); + sendResult(jer); + + environmentCounter++; } distributeFiles(tmpPath, env); JobExecutionResult jer = env.execute(); sendResult(jer); + clearPath(tmpPath); End diff – ah, yes that makes sense.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98630794

          — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py —
          @@ -163,10 +197,14 @@ def execute(self, local=False):

          The environment will execute all parts of the program that have resulted in a "sink" operation.
          """
          + global _operating, _last_env_id
          self._local_mode = local
          self._optimize_plan()

          • plan_mode = sys.stdin.readline().rstrip('\n') == "plan"
            + if self.container.is_executing():
            + plan_mode = False
            + else:
            + plan_mode = sys.stdin.readline().rstrip('\n') == "plan"
              • End diff –

          we can move this logic into the container as well and get rid of the `plan_mode` variable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98630794 — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py — @@ -163,10 +197,14 @@ def execute(self, local=False): The environment will execute all parts of the program that have resulted in a "sink" operation. """ + global _operating, _last_env_id self._local_mode = local self._optimize_plan() plan_mode = sys.stdin.readline().rstrip('\n') == "plan" + if self.container.is_executing(): + plan_mode = False + else: + plan_mode = sys.stdin.readline().rstrip('\n') == "plan" End diff – we can move this logic into the container as well and get rid of the `plan_mode` variable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98628798

          — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py —
          @@ -163,10 +197,14 @@ def execute(self, local=False):

          The environment will execute all parts of the program that have resulted in a "sink" operation.
          """
          + global _operating, _last_env_id
          — End diff –

          this line can be removed

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98628798 — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py — @@ -163,10 +197,14 @@ def execute(self, local=False): The environment will execute all parts of the program that have resulted in a "sink" operation. """ + global _operating, _last_env_id — End diff – this line can be removed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98630616

          — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py —
          @@ -27,17 +27,48 @@
          import sys
          from struct import pack

          +
          +class EnvironmentContainer(object):
          + """Keeps track of which ExecutionEnvironment is being run."""
          +
          + environment_counter = 0
          + environment_id_to_execute = None
          +
          + def create_environment(self):
          + env = Environment(self, self.environment_counter)
          + self.environment_counter += 1
          + return env
          +
          + def is_executing(self):
          + """Checks if we are waiting for a certain environment to be executed."""
          + return not self.environment_id_to_execute is None
          +
          + def fetch_next_environment(self, calling_environment_id):
          + """Checks (and if necessary, fetches) the next environment to be executed."""
          + if not self.is_executing():
          + self.environment_id_to_execute = int(sys.stdin.readline().rstrip('\n'))
          +
          + if self.environment_id_to_execute == calling_environment_id:
          + self.environment_id_to_execute = None
          + return True
          +
          + return False
          +
          +
          +container = EnvironmentContainer()
          +
          def get_environment():
          """
          Creates an execution environment that represents the context in which the program is currently executed.

          • +
            :return:The execution environment of the context in which the program is executed.
            """

          • return Environment()
            + global container
              • End diff –

          doesn't have to be global since we don't assign to `container` within `get_environment()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98630616 — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py — @@ -27,17 +27,48 @@ import sys from struct import pack + +class EnvironmentContainer(object): + """Keeps track of which ExecutionEnvironment is being run.""" + + environment_counter = 0 + environment_id_to_execute = None + + def create_environment(self): + env = Environment(self, self.environment_counter) + self.environment_counter += 1 + return env + + def is_executing(self): + """Checks if we are waiting for a certain environment to be executed.""" + return not self.environment_id_to_execute is None + + def fetch_next_environment(self, calling_environment_id): + """Checks (and if necessary, fetches) the next environment to be executed.""" + if not self.is_executing(): + self.environment_id_to_execute = int(sys.stdin.readline().rstrip('\n')) + + if self.environment_id_to_execute == calling_environment_id: + self.environment_id_to_execute = None + return True + + return False + + +container = EnvironmentContainer() + def get_environment(): """ Creates an execution environment that represents the context in which the program is currently executed. + :return:The execution environment of the context in which the program is executed. """ return Environment() + global container End diff – doesn't have to be global since we don't assign to `container` within `get_environment()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98629326

          — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py —
          @@ -46,6 +77,9 @@ def _init_(self):
          self._local_mode = False
          self._retry = 0

          + self.container = container
          — End diff –

          should be called `self._container`

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98629326 — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py — @@ -46,6 +77,9 @@ def _ init _(self): self._local_mode = False self._retry = 0 + self.container = container — End diff – should be called `self._container`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98629271

          — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py —
          @@ -46,6 +77,9 @@ def _init_(self):
          self._local_mode = False
          self._retry = 0

          + self.container = container
          + self.env_id = env_id
          — End diff –

          should be called `self._env_id`

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98629271 — Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py — @@ -46,6 +77,9 @@ def _ init _(self): self._local_mode = False self._retry = 0 + self.container = container + self.env_id = env_id — End diff – should be called `self._env_id`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3232#discussion_r98631554

          — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java —
          @@ -94,6 +95,7 @@

          private HashMap<Integer, Object> sets = new HashMap<>();
          public ExecutionEnvironment env;
          + private int environmentCounter = 0;
          — End diff –

          we should explicitly send the ID from python as a parameter like the parallelism. Right now would have to make sure that both python and java generate ID's the same way.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3232#discussion_r98631554 — Diff: flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java — @@ -94,6 +95,7 @@ private HashMap<Integer, Object> sets = new HashMap<>(); public ExecutionEnvironment env; + private int environmentCounter = 0; — End diff – we should explicitly send the ID from python as a parameter like the parallelism. Right now would have to make sure that both python and java generate ID's the same way.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user GEOFBOT commented on the issue:

          https://github.com/apache/flink/pull/3232

          Thanks for the comments, I've addressed them with the latest commit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 Thanks for the comments, I've addressed them with the latest commit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3232

          Yes, please continue to comment after pushing changes

          I'll have to try this out on a cluster to be certain, but from the looks of it this is good to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3232 Yes, please continue to comment after pushing changes I'll have to try this out on a cluster to be certain, but from the looks of it this is good to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user GEOFBOT commented on the issue:

          https://github.com/apache/flink/pull/3232

          In the process of getting a more complex job to run, I've tested a basic multi-job file (seen below) on an Amazon EMR YARN cluster and it works successfully.

          ```
          from flink.plan.Environment import get_environment
          from flink.plan.Constants import INT, STRING, WriteMode
          from flink.functions.GroupReduceFunction \
          import GroupReduceFunction
          from flink.functions.Aggregation import Sum

          import sys

          if _name_ == "_main_":
          output_file = 'hdfs:/tmp/out.txt'
          output_file2 = 'hdfs:/tmp/out2.txt'

          env = get_environment()
          data = env.from_elements((0,1),(1,2),(2,3),(2,10))

          data \
          .group_by(0) \
          .aggregate(Sum, 1) \
          .write_csv(output_file, write_mode=WriteMode.OVERWRITE)
          env.execute()

          env2 = get_environment()
          data2 = env2.read_csv(output_file, (INT, INT))

          data2 \
          .first(2) \
          .write_text(output_file2, write_mode=WriteMode.OVERWRITE)

          env2.execute()
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 In the process of getting a more complex job to run, I've tested a basic multi-job file (seen below) on an Amazon EMR YARN cluster and it works successfully. ``` from flink.plan.Environment import get_environment from flink.plan.Constants import INT, STRING, WriteMode from flink.functions.GroupReduceFunction \ import GroupReduceFunction from flink.functions.Aggregation import Sum import sys if _ name _ == "_ main _": output_file = 'hdfs:/tmp/out.txt' output_file2 = 'hdfs:/tmp/out2.txt' env = get_environment() data = env.from_elements((0,1),(1,2),(2,3),(2,10)) data \ .group_by(0) \ .aggregate(Sum, 1) \ .write_csv(output_file, write_mode=WriteMode.OVERWRITE) env.execute() env2 = get_environment() data2 = env2.read_csv(output_file, (INT, INT)) data2 \ .first(2) \ .write_text(output_file2, write_mode=WriteMode.OVERWRITE) env2.execute() ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user GEOFBOT commented on the issue:

          https://github.com/apache/flink/pull/3232

          It may have worked with a smaller file, but there may be issues with heavier jobs. When I ran a more computationally intensive and time consuming job, the first job of the Python file ran successfully. The second job of the file was then submitted:
          ```
          <snip>
          02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
          02/09/2017 16:39:43 Job execution switched to status FINISHED.
          2017-02-09 16:40:26,470 INFO org.apache.flink.yarn.YarnClusterClient - Waiting until all TaskManagers have connected
          Waiting until all TaskManagers have connected
          2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient - TaskManager status (5/5)
          TaskManager status (5/5)
          2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient - All TaskManagers are connected
          All TaskManagers are connected
          2017-02-09 16:40:26,480 INFO org.apache.flink.yarn.YarnClusterClient - Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
          Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
          Connected to JobManager at Actor[akka.tcp://flink@<snip>.ec2.internal:35598/user/jobmanager#68430682]
          ```

          However, Flink does not receive or respond to this new job. Instead, the client terminates with a timeout error:
          ```
          Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
          at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
          at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
          at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
          at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
          at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
          ```

          I tried setting `akka.client.timeout` to 20 minutes, but Flink is still not receiving the second job. I suspect this may be an issue with this patch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 It may have worked with a smaller file, but there may be issues with heavier jobs. When I ran a more computationally intensive and time consuming job, the first job of the Python file ran successfully. The second job of the file was then submitted: ``` <snip> 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED 02/09/2017 16:39:43 Job execution switched to status FINISHED. 2017-02-09 16:40:26,470 INFO org.apache.flink.yarn.YarnClusterClient - Waiting until all TaskManagers have connected Waiting until all TaskManagers have connected 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient - TaskManager status (5/5) TaskManager status (5/5) 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient - All TaskManagers are connected All TaskManagers are connected 2017-02-09 16:40:26,480 INFO org.apache.flink.yarn.YarnClusterClient - Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion. Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion. Connected to JobManager at Actor [akka.tcp://flink@<snip>.ec2.internal:35598/user/jobmanager#68430682] ``` However, Flink does not receive or respond to this new job. Instead, the client terminates with a timeout error: ``` Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission. at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119) at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) ``` I tried setting `akka.client.timeout` to 20 minutes, but Flink is still not receiving the second job. I suspect this may be an issue with this patch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user GEOFBOT commented on the issue:

          https://github.com/apache/flink/pull/3232

          > It may have worked with a smaller file, but there may be issues with heavier jobs.

          How silly of me. This problem had nothing to do with this pull request, with YARN, with issues in Flink, or with the size of the input file at all. I was using `ExecutionEnvironment.from_elements` to generate a large sequence of indexed zeroes to fill in the gaps of another indexed DataSet with zeroes. However, when I was using large input files, I set larger parameters and generated larger zero sequences. Because I was using `from_elements`, the client needed to send all of those values (lots and lots of zeroes) to the runtime, which was very time-consuming and caused the timeout. I have replaced this with a `generate_sequence` call and a map function, which does not require sending lots and lots of values from the client to the runtime, and the job (and this pull request) seem to work just fine.

          (change in question: https://github.com/quinngroup/pyflink-r1dl/commit/00a16d564bfad21fc1f4958677ada0a95fa9f088)

          Show
          githubbot ASF GitHub Bot added a comment - Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 > It may have worked with a smaller file, but there may be issues with heavier jobs. How silly of me. This problem had nothing to do with this pull request, with YARN, with issues in Flink, or with the size of the input file at all. I was using `ExecutionEnvironment.from_elements` to generate a large sequence of indexed zeroes to fill in the gaps of another indexed DataSet with zeroes. However, when I was using large input files, I set larger parameters and generated larger zero sequences. Because I was using `from_elements`, the client needed to send all of those values (lots and lots of zeroes) to the runtime, which was very time-consuming and caused the timeout. I have replaced this with a `generate_sequence` call and a map function, which does not require sending lots and lots of values from the client to the runtime, and the job (and this pull request) seem to work just fine. (change in question: https://github.com/quinngroup/pyflink-r1dl/commit/00a16d564bfad21fc1f4958677ada0a95fa9f088 )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user GEOFBOT commented on the issue:

          https://github.com/apache/flink/pull/3232

          Resolved merge conflicts

          Show
          githubbot ASF GitHub Bot added a comment - Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 Resolved merge conflicts
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3232

          I will now try this out and merge it if it workds.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3232 I will now try this out and merge it if it workds.
          Hide
          Zentol Chesnay Schepler added a comment -

          2ce080da627cbc77527b69bfb877d4904e9a0701

          Show
          Zentol Chesnay Schepler added a comment - 2ce080da627cbc77527b69bfb877d4904e9a0701
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3232

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3232
          Hide
          hydronium Geoffrey Mon added a comment -

          Thanks for your help!

          Show
          hydronium Geoffrey Mon added a comment - Thanks for your help!

            People

            • Assignee:
              hydronium Geoffrey Mon
              Reporter:
              hydronium Geoffrey Mon
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development