Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6177

Add support for "Distributed Cache" in streaming applications

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Issue Links

        Activity

        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment -

        Hi,

        The "counterpart" of the distributed cache in the DataStream API is Side Inputs, which is already under heavy discussion and has a umbrella JIRA tracking progress: https://issues.apache.org/jira/browse/FLINK-6131.

        Could you close this one (since I would say its a duplicate) and checkout FLINK-6131 to see if there's any tasks there you want to take on? Thanks!

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Hi, The "counterpart" of the distributed cache in the DataStream API is Side Inputs, which is already under heavy discussion and has a umbrella JIRA tracking progress: https://issues.apache.org/jira/browse/FLINK-6131 . Could you close this one (since I would say its a duplicate) and checkout FLINK-6131 to see if there's any tasks there you want to take on? Thanks!
        Hide
        Zentol Chesnay Schepler added a comment -

        The Distributed Cache is for transferring files while side inputs is for adding an additional input DataStream.

        Show
        Zentol Chesnay Schepler added a comment - The Distributed Cache is for transferring files while side inputs is for adding an additional input DataStream.
        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment -

        Ah, yikes, sorry I was thinking about "broadcast sets". Please ignore my comment then

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Ah, yikes, sorry I was thinking about "broadcast sets". Please ignore my comment then
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user zohar-pm opened a pull request:

        https://github.com/apache/flink/pull/3741

        FLINK-6177 Add support for "Distributed Cache" in streaming applica…

        …tions

        Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
        If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
        In addition to going through the list, please provide a meaningful description of your changes.

        • [ ] General
        • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
        • The pull request addresses only one issue
        • Each commit in the PR has a meaningful commit message (including the JIRA id)
        • [ ] Documentation
        • Documentation has been added for new functionality
        • Old documentation affected by the pull request has been updated
        • JavaDoc for public methods has been added
        • [ ] Tests & Build
        • Functionality added by the pull request is covered by tests
        • `mvn clean verify` has been executed successfully locally or a Travis build has passed

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/zohar-pm/flink python-streaming-distributed-cache

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3741.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3741


        commit 03684e8e460143013babb2ec88c66c8fa1119c43
        Author: Zohar Mizrahi <zohar.mizrahi@parallelmachines.com>
        Date: 2017-04-09T09:11:57Z

        FLINK-6177 Add support for "Distributed Cache" in streaming applications


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user zohar-pm opened a pull request: https://github.com/apache/flink/pull/3741 FLINK-6177 Add support for "Distributed Cache" in streaming applica… …tions Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zohar-pm/flink python-streaming-distributed-cache Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3741.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3741 commit 03684e8e460143013babb2ec88c66c8fa1119c43 Author: Zohar Mizrahi <zohar.mizrahi@parallelmachines.com> Date: 2017-04-09T09:11:57Z FLINK-6177 Add support for "Distributed Cache" in streaming applications
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StephanEwen commented on the issue:

        https://github.com/apache/flink/pull/3741

        Thanks for contributing this, the added functionality looks good.

        I would prefer to add this change without changing the dependencies and test base classes. You could for example change the test to throw an exception in the "validator function" if the word is not in the cache file. That way you do not need to "collect back" the data.

        Minor comment: Generating the input from a collection rather than a file makes the tests usually a bit more lightweight. In all newer tests, we try to do that.

        Show
        githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3741 Thanks for contributing this, the added functionality looks good. I would prefer to add this change without changing the dependencies and test base classes. You could for example change the test to throw an exception in the "validator function" if the word is not in the cache file. That way you do not need to "collect back" the data. Minor comment: Generating the input from a collection rather than a file makes the tests usually a bit more lightweight. In all newer tests, we try to do that.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user zohar-pm commented on the issue:

        https://github.com/apache/flink/pull/3741

        No problem. Will follow it and will create a new pull request.

        Show
        githubbot ASF GitHub Bot added a comment - Github user zohar-pm commented on the issue: https://github.com/apache/flink/pull/3741 No problem. Will follow it and will create a new pull request.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user zentol commented on the issue:

        https://github.com/apache/flink/pull/3741

        @zohar-pm You don't have to open a new one, feel free to update the branch in this one.

        Show
        githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3741 @zohar-pm You don't have to open a new one, feel free to update the branch in this one.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StephanEwen commented on the issue:

        https://github.com/apache/flink/pull/3741

        Looks quite good now.

        If I can ask you for one more followup: To have faster tests, it would be good to add the streaming distributed cache test and the batch distributed cache test to the same file.

        Can you change the `DistributedCacheTest` to extend `StreamingMultipleProgramsTestBase` and put your test in there as well? That will cause only one distributed mini cluster to be spawned, which typically saves 1-2 secs in tests. May not seem much, but it adds up over the 1000s of tests Flink has by now...

        Show
        githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3741 Looks quite good now. If I can ask you for one more followup: To have faster tests, it would be good to add the streaming distributed cache test and the batch distributed cache test to the same file. Can you change the `DistributedCacheTest` to extend `StreamingMultipleProgramsTestBase` and put your test in there as well? That will cause only one distributed mini cluster to be spawned, which typically saves 1-2 secs in tests. May not seem much, but it adds up over the 1000s of tests Flink has by now...
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user zentol commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3741#discussion_r114503248

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java —
        @@ -1776,8 +1787,45 @@ public static void setDefaultLocalParallelism(int parallelism) {
        protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx)

        { contextEnvironmentFactory = ctx; }
        • +
          protected static void resetContextEnvironment()

          { contextEnvironmentFactory = null; }

          +
          + /**
          + * Registers a file at the distributed cache under the given name. The file will be accessible
          + * from any user-defined function in the (distributed) runtime under a local path. Files
          + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
          + * The runtime will copy the files temporarily to a local cache, if needed.
          + * <p>
          + * The

          {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
          + * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
          + * {@link org.apache.flink.api.common.cache.DistributedCache} via
          + * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
          + *
          + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
          + * @param name The name under which the file is registered.
          + */
          + public void registerCachedFile(String filePath, String name){ + registerCachedFile(filePath, name, false); + }
          +
          + /**
          + * Registers a file at the distributed cache under the given name. The file will be accessible
          + * from any user-defined function in the (distributed) runtime under a local path. Files
          + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
          + * The runtime will copy the files temporarily to a local cache, if needed.
          + * <p>
          + * The {@link org.apache.flink.api.common.functions.RuntimeContext}

          can be obtained inside UDFs via
          + *

          {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()}

          and provides access
          + *

          {@link org.apache.flink.api.common.cache.DistributedCache}

          via
          + *

          {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}

          .
          + *
          + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
          + * @param name The name under which the file is registered.
          + * @param executable flag indicating whether the file should be executable
          + */
          + public void registerCachedFile(String filePath, String name, boolean executable)

          { --- End diff -- missing space before `}

          `.

        Show
        githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3741#discussion_r114503248 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java — @@ -1776,8 +1787,45 @@ public static void setDefaultLocalParallelism(int parallelism) { protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = ctx; } + protected static void resetContextEnvironment() { contextEnvironmentFactory = null; } + + /** + * Registers a file at the distributed cache under the given name. The file will be accessible + * from any user-defined function in the (distributed) runtime under a local path. Files + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. + * The runtime will copy the files temporarily to a local cache, if needed. + * <p> + * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via + * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access + * {@link org.apache.flink.api.common.cache.DistributedCache} via + * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. + * + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path") + * @param name The name under which the file is registered. + */ + public void registerCachedFile(String filePath, String name){ + registerCachedFile(filePath, name, false); + } + + /** + * Registers a file at the distributed cache under the given name. The file will be accessible + * from any user-defined function in the (distributed) runtime under a local path. Files + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. + * The runtime will copy the files temporarily to a local cache, if needed. + * <p> + * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via + * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access + * {@link org.apache.flink.api.common.cache.DistributedCache} via + * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()} . + * + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path") + * @param name The name under which the file is registered. + * @param executable flag indicating whether the file should be executable + */ + public void registerCachedFile(String filePath, String name, boolean executable) { --- End diff -- missing space before `} `.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user zentol commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3741#discussion_r114503212

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java —
        @@ -1776,8 +1787,45 @@ public static void setDefaultLocalParallelism(int parallelism) {
        protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx)

        { contextEnvironmentFactory = ctx; }
        • +
          protected static void resetContextEnvironment()

          { contextEnvironmentFactory = null; }

          +
          + /**
          + * Registers a file at the distributed cache under the given name. The file will be accessible
          + * from any user-defined function in the (distributed) runtime under a local path. Files
          + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
          + * The runtime will copy the files temporarily to a local cache, if needed.
          + * <p>
          + * The

          {@link org.apache.flink.api.common.functions.RuntimeContext}

          can be obtained inside UDFs via
          + *

          {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()}

          and provides access
          + *

          {@link org.apache.flink.api.common.cache.DistributedCache}

          via
          + *

          {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}

          .
          + *
          + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
          + * @param name The name under which the file is registered.
          + */
          + public void registerCachedFile(String filePath, String name){

            • End diff –

        missing space before `{`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3741#discussion_r114503212 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java — @@ -1776,8 +1787,45 @@ public static void setDefaultLocalParallelism(int parallelism) { protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = ctx; } + protected static void resetContextEnvironment() { contextEnvironmentFactory = null; } + + /** + * Registers a file at the distributed cache under the given name. The file will be accessible + * from any user-defined function in the (distributed) runtime under a local path. Files + * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. + * The runtime will copy the files temporarily to a local cache, if needed. + * <p> + * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via + * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access + * {@link org.apache.flink.api.common.cache.DistributedCache} via + * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()} . + * + * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path") + * @param name The name under which the file is registered. + */ + public void registerCachedFile(String filePath, String name){ End diff – missing space before `{`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user zentol commented on the issue:

        https://github.com/apache/flink/pull/3741

        merging. will add the missing space while I'm doing it.

        Show
        githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3741 merging. will add the missing space while I'm doing it.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/flink/pull/3741

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3741
        Hide
        Zentol Chesnay Schepler added a comment -

        1.3: 3655dee5b5feee46eaadeaae221fa8f358b90340

        Show
        Zentol Chesnay Schepler added a comment - 1.3: 3655dee5b5feee46eaadeaae221fa8f358b90340

          People

          • Assignee:
            zohar.mizrahi Zohar Mizrahi
            Reporter:
            zohar.mizrahi Zohar Mizrahi
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development