Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-3545

Let M FlowFilews pass through once N signals arrive

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Done
    • None
    • 1.2.0
    • Extensions
    • None

    Description

      If Wait processor can:
      "Let M flow files pass through once N notify signals arrived for key K"
      we can support more variety type of use-cases. Currently, it only support
      "Let 1 flow file pass through once N notify signals arrived for key K"

      How it works? Simulation

      For example, let's say there are 50 incoming flow files at the beginning, f1 to f50.
      N=3, M=100
      It can be read as "Wait processor is allowed to convert 3 signals to get 100 pass tickets."

      1. There's no signal for K, all flow files are waiting
      2. Notify sends a signal. K( N=1 ) doesn't meet Wait condition, Wait processor is still waiting
      3. Notify sends another two signals. Now K( N=3 ) matches Wait condition
      4. Wait processor starts consuming flow files, f1 to f50, update K( N=3, M=50), where M denotes remaining number of flow files those can go through
      5. Another 30 flow files arrive, Wait processor consumes f51 to f80, update K( N=0, M=20)
      6. Another 30 flow files arrive, Wait processor consumes f81 to f100. K is now K( N=0, M=0 ). Since all N and M is used, Wait processor removes K. f101 to f110 are waiting for signals, same state as #1.

      Alternative path after 6

      7a. If Notify sends additional signals, then f101 to f110 can go through
      7b. If Notify doesn't send any more signal, then f101 to f110 will be routed to expired

      Alternative path after 5

      6a. If Notify sends additional signal at this point, K would be K( N=1, M=20). Wait processor can process 20 flow files because it still has M=20.
      6b. If Notify sends additional three signals, K would be K(N=3, M=20). Wait processor consumes 20 flow files, and when 21th flow file comes, it immediately convert N to M, meaning consume N(3) to create M(100) pass, then K(N=0, M=100)

      Additionally, we can let user configure M=0. Meaning, Wait can release any number of incoming flow files as long as N meets the condition.
      With this, Notify +1 can behave as if it opens a GATE, and Notify –1 will close it.

      Another possible use-case, 'Limit data flow rate at cluster wide'

      It's more complex than just supporting GATE open/close state. However, if we support M flow files to go through, it can also provide rate limit across cluster.

      Example use case, NiFi A push data via S2S to NiFi B, and want to limit 100 flow files per 5 min.

      On NiFi A:
      Notify part of flow: GenerateFlowFile(5 min, on primary) -> Notify(K, N=+1)
      Wait part of flow: Some ingested data -> Wait(K, N=1, M=100)
      Since Wait/Notify state is managed globally via DistributedCache, we can limit throughput cluster wide.

      If use case requires to limit rate exactly, then they can design Notify part as:
      GenerateFlowFile(5 min, on primary) -> Notify(K, N=0) -> Notify(K, N=+1)
      It avoids N to be added up when there's no traffic.

      Attachments

        Issue Links

          Activity

            People

              ijokarumawak Koji Kawamura
              ijokarumawak Koji Kawamura
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: