Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-3216

Add ability to wait for N signals to Wait/Notify processors

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.2.0
    • Component/s: None
    • Labels:
      None

      Description

      The recently added Wait and Notify processors allow a flow file to be held at the Wait processor until a signal is received in the Notify processor. It would be nice to be able to wait for N signals before releasing.

      One way this could be done is to have a property like "Signal Count" on the Wait processor, and then count the keys in the cache starting with some pattern, and release when the # of keys equals the signal count.

      This would require the ability to get all the keys from the cache, or at least get all keys matching a pattern:

      https://issues.apache.org/jira/browse/NIFI-3214

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/1420

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1420
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 7f0171ffa25bfd8932e49b3367049b101799dea4 in nifi's branch refs/heads/master from Koji Kawamura
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=7f0171f ]

          NIFI-3216: Add N signals to Wait/Notify

          • Support counters at Wait/Notify processors so that NiFi flow can be
            configured to wait for N signals
          • Extract Wait/Notify logics into WaitNotifyProtocol
          • Added FragmentAttributes to manage commonly used fragment attributes
          • Changed existing split processors to set 'fragment.identifier' and
            'fragment.count', so that Wait can use those to wait for all splits
            get processed

          This closes #1420.

          Signed-off-by: Bryan Bende <bbende@apache.org>

          Show
          jira-bot ASF subversion and git services added a comment - Commit 7f0171ffa25bfd8932e49b3367049b101799dea4 in nifi's branch refs/heads/master from Koji Kawamura [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=7f0171f ] NIFI-3216 : Add N signals to Wait/Notify Support counters at Wait/Notify processors so that NiFi flow can be configured to wait for N signals Extract Wait/Notify logics into WaitNotifyProtocol Added FragmentAttributes to manage commonly used fragment attributes Changed existing split processors to set 'fragment.identifier' and 'fragment.count', so that Wait can use those to wait for all splits get processed This closes #1420. Signed-off-by: Bryan Bende <bbende@apache.org>
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on the issue:

          https://github.com/apache/nifi/pull/1420

          @ijokarumawak this is really awesome stuff! Tested this out and working nicely, code looks great. I made a couple of small tweaks to the documentation on the processors, going to merge to master shortly. Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on the issue: https://github.com/apache/nifi/pull/1420 @ijokarumawak this is really awesome stuff! Tested this out and working nicely, code looks great. I made a couple of small tweaks to the documentation on the processors, going to merge to master shortly. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on the issue:

          https://github.com/apache/nifi/pull/1420

          Reviewing...

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on the issue: https://github.com/apache/nifi/pull/1420 Reviewing...
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user ijokarumawak opened a pull request:

          https://github.com/apache/nifi/pull/1420

          NIFI-3216: Add N signals to Wait/Notify

          *NOTE* This PR is based on [NIFI-3214: Added fetch and replace to DistributedMapCache](https://github.com/apache/nifi/pull/1410) which is under review process. Please review and comment on NIFI-3214 for DistributedCache implementation. This PR focuses on Wait/Notify processors to provide waiting mechanism for multiple signals.

          • Support counters at Wait/Notify processors so that NiFi flow can be configured to wait for N signals
          • Extract Wait/Notify logics into WaitNotifyProtocol
          • Added FragmentAttributes to manage commonly used fragment attributes
          • Changed existing split processors to set 'fragment.identifier' and 'fragment.count', so that Wait can use those to wait for all splits get processed

          A NiFi template file for testing this new capability is available in this [Gist](https://gist.github.com/ijokarumawak/dc81cbfa04bc7ec39e89522653db5ea5). It contains Process Groups for each updated split processors work with Wait/Notify processors looks like below:

          ![](https://gist.githubusercontent.com/ijokarumawak/dc81cbfa04bc7ec39e89522653db5ea5/raw/f0259717182d589b542cde1f40c166ffb6b75bec/process-groups.png)

          ![](https://gist.githubusercontent.com/ijokarumawak/dc81cbfa04bc7ec39e89522653db5ea5/raw/ef2f58c7eee8734be41bf4b5e9177a3c83412931/split-text.png)

          Thank you for submitting a contribution to Apache NiFi.

          In order to streamline the review of the contribution we ask you
          to ensure the following steps have been taken:

              1. For all changes:
          • [x] Is there a JIRA ticket associated with this PR? Is it referenced
            in the commit message?
          • [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
          • [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
          • [ ] Is your initial contribution a single, squashed commit?
              1. For code changes:
          • [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
          • [x] Have you written or updated unit tests to verify your changes?
          • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
          • [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
          • [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
          • [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
              1. For documentation related changes:
          • [x] Have you ensured that format looks appropriate for the output in which it is rendered?
              1. Note:
                Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/ijokarumawak/nifi nifi-3216

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/1420.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1420


          commit 42cf830c7101a33c54689afd01a864c961de95be
          Author: Koji Kawamura <ijokarumawak@apache.org>
          Date: 2017-01-11T06:57:40Z

          NIFI-3214: Added fetch and replace to DistributedMapCache

          • Using fetch and replace together can provide optimistic locking for
            concurrency control.
          • Added fetch to get cache entry with its meta data such as revision
            number.
          • Added replace to update cache only if it has not been updated.
          • Added Map Cache protocol version 2 for those new operations.
          • Existing operations such as get or put can work with protocol version
            1.

          commit 05aa3214150a9f2abb712af1e46c5cde70a0322c
          Author: Koji Kawamura <ijokarumawak@apache.org>
          Date: 2017-01-13T07:52:30Z

          NIFI-3216: Add N signals to Wait/Notify

          • Support counters at Wait/Notify processors so that NiFi flow can be
            configured to wait for N signals
          • Extract Wait/Notify logics into WaitNotifyProtocol
          • Added FragmentAttributes to manage commonly used fragment attributes
          • Changed existing split processors to set 'fragment.identifier' and
            'fragment.count', so that Wait can use those to wait for all splits
            get processed

          Show
          githubbot ASF GitHub Bot added a comment - GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/1420 NIFI-3216 : Add N signals to Wait/Notify * NOTE * This PR is based on [NIFI-3214: Added fetch and replace to DistributedMapCache] ( https://github.com/apache/nifi/pull/1410 ) which is under review process. Please review and comment on NIFI-3214 for DistributedCache implementation. This PR focuses on Wait/Notify processors to provide waiting mechanism for multiple signals. Support counters at Wait/Notify processors so that NiFi flow can be configured to wait for N signals Extract Wait/Notify logics into WaitNotifyProtocol Added FragmentAttributes to manage commonly used fragment attributes Changed existing split processors to set 'fragment.identifier' and 'fragment.count', so that Wait can use those to wait for all splits get processed A NiFi template file for testing this new capability is available in this [Gist] ( https://gist.github.com/ijokarumawak/dc81cbfa04bc7ec39e89522653db5ea5 ). It contains Process Groups for each updated split processors work with Wait/Notify processors looks like below: ![]( https://gist.githubusercontent.com/ijokarumawak/dc81cbfa04bc7ec39e89522653db5ea5/raw/f0259717182d589b542cde1f40c166ffb6b75bec/process-groups.png ) ![]( https://gist.githubusercontent.com/ijokarumawak/dc81cbfa04bc7ec39e89522653db5ea5/raw/ef2f58c7eee8734be41bf4b5e9177a3c83412931/split-text.png ) Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: For all changes: [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? [ ] Is your initial contribution a single, squashed commit? For code changes: [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? [x] Have you written or updated unit tests to verify your changes? [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0] ( http://www.apache.org/legal/resolved.html#category-a)? [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? For documentation related changes: [x] Have you ensured that format looks appropriate for the output in which it is rendered? Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-3216 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1420.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1420 commit 42cf830c7101a33c54689afd01a864c961de95be Author: Koji Kawamura <ijokarumawak@apache.org> Date: 2017-01-11T06:57:40Z NIFI-3214 : Added fetch and replace to DistributedMapCache Using fetch and replace together can provide optimistic locking for concurrency control. Added fetch to get cache entry with its meta data such as revision number. Added replace to update cache only if it has not been updated. Added Map Cache protocol version 2 for those new operations. Existing operations such as get or put can work with protocol version 1. commit 05aa3214150a9f2abb712af1e46c5cde70a0322c Author: Koji Kawamura <ijokarumawak@apache.org> Date: 2017-01-13T07:52:30Z NIFI-3216 : Add N signals to Wait/Notify Support counters at Wait/Notify processors so that NiFi flow can be configured to wait for N signals Extract Wait/Notify logics into WaitNotifyProtocol Added FragmentAttributes to manage commonly used fragment attributes Changed existing split processors to set 'fragment.identifier' and 'fragment.count', so that Wait can use those to wait for all splits get processed
          Hide
          bende Bryan Bende added a comment -

          Joseph Gresock yes that is how I envisioned it working, same signal attribute on all the signal flow files and just waiting for N of them.

          Show
          bende Bryan Bende added a comment - Joseph Gresock yes that is how I envisioned it working, same signal attribute on all the signal flow files and just waiting for N of them.
          Hide
          jgresock Joseph Gresock added a comment -

          I like all of these additions, and concur that they should still capture the original single-key use case. Just so I'm clear, the key would still just be a value, not a regex? i.e., you'd have to receive N flow files with the exact same attribute value in order to increment the counter N times for that key?

          Show
          jgresock Joseph Gresock added a comment - I like all of these additions, and concur that they should still capture the original single-key use case. Just so I'm clear, the key would still just be a value, not a regex? i.e., you'd have to receive N flow files with the exact same attribute value in order to increment the counter N times for that key?
          Hide
          ijokarumawak Koji Kawamura added a comment -

          Yes, we also need an optional property in the Wait processor to specify which count to check, if this is not specified, let it uses total count.

          Show
          ijokarumawak Koji Kawamura added a comment - Yes, we also need an optional property in the Wait processor to specify which count to check, if this is not specified, let it uses total count.
          Hide
          bende Bryan Bende added a comment -

          Koji Kawamura Great idea! That would be very helpful to identify why the expected number of signals wasn't reached. We would also need that optional property in the Wait processor so it knew which count to check, right?

          So in your example Wait would have something like "Signal Count" set to 1,000 and "Signal Count Name" set to "success" (of course these could be expression language references too), then there would be one or more Notify processors with one of them also having "Signal Count Name" set to "success".

          The Wait processor also has a concept of expiring the wait based on the Expiration Duration in the processor, so maybe it can log all the counts when something is expired so that it is easy to see why it didn't get successfully released.

          Show
          bende Bryan Bende added a comment - Koji Kawamura Great idea! That would be very helpful to identify why the expected number of signals wasn't reached. We would also need that optional property in the Wait processor so it knew which count to check, right? So in your example Wait would have something like "Signal Count" set to 1,000 and "Signal Count Name" set to "success" (of course these could be expression language references too), then there would be one or more Notify processors with one of them also having "Signal Count Name" set to "success". The Wait processor also has a concept of expiring the wait based on the Expiration Duration in the processor, so maybe it can log all the counts when something is expired so that it is easy to see why it didn't get successfully released.
          Hide
          ijokarumawak Koji Kawamura added a comment -

          Bryan Bende I agree with the idea of adding atomic replace method, instead of making cache engine capable of returning all keys or query keys that can be costly or not practical with some cache engines especially distributed ones.

          I think it'd be also beneficial to add 'result state' semantics into the count.
          For example, there're 1,000 entries in the original input, then those are split and processed in downstream flow, then some of those can succeed while some fail, with this partially success scenario, users may want to change subsequent flow based on the failure rate.
          This can be done with following JSON:

          {
            "counts" : {
              "success": 995,
              "failure": 5
            },
            "attributes" : {
               "attr1" : "val1",
               "attr2" : "val2"
            }
          }
          

          So that Wait processor can check whether total count reaches desired count, and also add more context for further processing. The name of counts can be anything, not limited to success/failure distinction. Add optional property to Notify processor to specify the name of count.

          Show
          ijokarumawak Koji Kawamura added a comment - Bryan Bende I agree with the idea of adding atomic replace method, instead of making cache engine capable of returning all keys or query keys that can be costly or not practical with some cache engines especially distributed ones. I think it'd be also beneficial to add 'result state' semantics into the count. For example, there're 1,000 entries in the original input, then those are split and processed in downstream flow, then some of those can succeed while some fail, with this partially success scenario, users may want to change subsequent flow based on the failure rate. This can be done with following JSON: { "counts" : { "success" : 995, "failure" : 5 }, "attributes" : { "attr1" : "val1" , "attr2" : "val2" } } So that Wait processor can check whether total count reaches desired count, and also add more context for further processing. The name of counts can be anything, not limited to success/failure distinction. Add optional property to Notify processor to specify the name of count.
          Hide
          bende Bryan Bende added a comment -

          Final note... as part of this, it would also be nice to update the "split" processors we have to add the total number of splits as an attribute to the original flow file. For example, SplitJSON adds "fragment.count" to each split flow file so that MergeContent can "defragment" them later, but it would be nice to send the original JSON to a Wait processor with the "Signal Count" set to $

          {fragment.count}

          and then send all the splits to a Notify processor, so that the original JSON is released when all splits are done processing.

          Show
          bende Bryan Bende added a comment - Final note... as part of this, it would also be nice to update the "split" processors we have to add the total number of splits as an attribute to the original flow file. For example, SplitJSON adds "fragment.count" to each split flow file so that MergeContent can "defragment" them later, but it would be nice to send the original JSON to a Wait processor with the "Signal Count" set to $ {fragment.count} and then send all the splits to a Notify processor, so that the original JSON is released when all splits are done processing.
          Hide
          bende Bryan Bende added a comment -

          Also, I had originally linked NIFI-3214 to this ticket because I was thinking we needed that change first. We can either turn that ticket into the ticket for adding the replace method to the cache, or we can just unlink this ticket from NIFI-3214 and do all the work under this ticket.

          Show
          bende Bryan Bende added a comment - Also, I had originally linked NIFI-3214 to this ticket because I was thinking we needed that change first. We can either turn that ticket into the ticket for adding the replace method to the cache, or we can just unlink this ticket from NIFI-3214 and do all the work under this ticket.
          Hide
          bende Bryan Bende added a comment -

          I was hoping to work this ticket, but not sure I will end up having time so I wanted to document the design I've been thinking of over the last week...

          Originally I was thinking that each signal would be its own entry in the cache, and the wait would get all the keys and determine if the number of keys with the pattern equaled the signal count, and if so then release. The downside to this approach is that it could possibly require a very large number of cache entries. Consider the case were you SplitText on a 1 million row CSV and want to do something with the original CSV after something else has been done to all 1 million rows are processed, that would require 1 million entries in the cache.

          A better approach would probably be to use a single key (as it does now) and make the value of that key be a JSON document (or some structured format) that contains the count of signals for that key, as well as a map of metadata attributes. In order to do this we need to modify the DistributedMapCache to support a replace method that takes the key, the expected current value, and the new value to replace with. The cache server will perform the replace if the current value was the expected current value and return true, otherwise the replace won't be performed and false will be returned.

          So the DistributedMapCacheClient would have methods like:

          <K, V> boolean replace(K key, V currentValue, V replaceValue, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException;
          
           boolean isReplaceSupported();
          

          The isReplaceSupported() can be used for a cache provider to indicate if this operation is not supported in the even that we eventually provide additional back-end caches, and the Wait/Notify processor could use this to perform custom validation to ensure the processor's are only valid if a cache with replacement capabilities is selected.

          So lets say the signal key is "my.signal" and we are going to store the value as JSON like:

          {
            "count" : 0,
            "attributes" : {
               "attr1" : "val1",
               "attr2" : "val2"
            }
          }
          

          When a signal hits the Notify processor it would try retrieve the value of "my.signal" from the cache. If nothing is found then it creates the first version of the JSON with count of 1 and any attributes (based on the regex) and stores it in the cache. If an entry was found then it would increment the count in the JSON and add in any attributes then call replace with the original JSON and the new JSON. If the replace fails it is because something else could have updated the cache between the original retrieval and the replace, so it would repeat the process (get the current value again and attempt replace).

          One other thing to consider is the uniqueness of attributes (i.e. if there are 1000 signals and they all have attribute 'foo'). I am proposing that when using these processors in this multi-signal mode we keep it simple and just merge together all the attributes and you would end up with the value of the attribute from the last signal.

          Show
          bende Bryan Bende added a comment - I was hoping to work this ticket, but not sure I will end up having time so I wanted to document the design I've been thinking of over the last week... Originally I was thinking that each signal would be its own entry in the cache, and the wait would get all the keys and determine if the number of keys with the pattern equaled the signal count, and if so then release. The downside to this approach is that it could possibly require a very large number of cache entries. Consider the case were you SplitText on a 1 million row CSV and want to do something with the original CSV after something else has been done to all 1 million rows are processed, that would require 1 million entries in the cache. A better approach would probably be to use a single key (as it does now) and make the value of that key be a JSON document (or some structured format) that contains the count of signals for that key, as well as a map of metadata attributes. In order to do this we need to modify the DistributedMapCache to support a replace method that takes the key, the expected current value, and the new value to replace with. The cache server will perform the replace if the current value was the expected current value and return true, otherwise the replace won't be performed and false will be returned. So the DistributedMapCacheClient would have methods like: <K, V> boolean replace(K key, V currentValue, V replaceValue, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException; boolean isReplaceSupported(); The isReplaceSupported() can be used for a cache provider to indicate if this operation is not supported in the even that we eventually provide additional back-end caches, and the Wait/Notify processor could use this to perform custom validation to ensure the processor's are only valid if a cache with replacement capabilities is selected. So lets say the signal key is "my.signal" and we are going to store the value as JSON like: { "count" : 0, "attributes" : { "attr1" : "val1" , "attr2" : "val2" } } When a signal hits the Notify processor it would try retrieve the value of "my.signal" from the cache. If nothing is found then it creates the first version of the JSON with count of 1 and any attributes (based on the regex) and stores it in the cache. If an entry was found then it would increment the count in the JSON and add in any attributes then call replace with the original JSON and the new JSON. If the replace fails it is because something else could have updated the cache between the original retrieval and the replace, so it would repeat the process (get the current value again and attempt replace). One other thing to consider is the uniqueness of attributes (i.e. if there are 1000 signals and they all have attribute 'foo'). I am proposing that when using these processors in this multi-signal mode we keep it simple and just merge together all the attributes and you would end up with the value of the attribute from the last signal.

            People

            • Assignee:
              ijokarumawak Koji Kawamura
              Reporter:
              bende Bryan Bende
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development