Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-8136

Allow State Management to be tied to Process Session

    XMLWordPrintableJSON

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.13.0
    • Component/s: Core Framework
    • Labels:
      None

      Description

      We have many processors currently that store state using NiFi's built-in state management capabilities. To do this, processors need to do something like:

      Map<String, String> state = new HashMap<>();
      state.put("key", "value1");
      state.put("key2", "value2");
      
      if (flowFile != null) {
          ...
          state.put("key2", updatedValue2);
      }
      
      session.commit();
      context.getStateManager().setState(state, Scope.LOCAL);

      Which is not a terrible API but comes with a few downfalls.

      If using a processor that has the @SupportsBatching annotation, calls to ProcessContext.getStateManager().getState(Scope) can be costly to invoke for each FlowFile. To avoid this, processors typically end up having to cache the values themselves.

      Depending on the code, management of the state map can be difficult and there's no ability to rollback the state changes once applied because the call to setState() immediately updates the remote state.

      If executing within a different context, in which we want to store state atomically with the FlowFiles that resulted in the state change, there's no way to do that currently.

      To overcome these problems, we should allow for setting, getting, clearing, and replacing state to be done via the ProcessSession, in addition to the State Manager. I.e., a Processor developer may do either of:

      context.getStateManager().setState(...);

      Or

      session.setState(...); 

      The former would behave as it does now, immediately updating state on the remote system (zookeeper, for example). The latter would simply update an in-memory copy of the state in the Process Session. When ProcessSession.commit() is called, it would push the new state to the remote system. If the session is rolled back, it would simply not update the state. This allows the state to be set in the middle of the processor's algorithm, rather than requiring that it be held onto until after session commit is successful. If the session is then checkpointed (via session.commit while running with a Run Duration greater than 0 ms), then the Session Checkpoint will keep the state. Rolling back the session but not the checkpoint would then result in the checkpointed state still be pushed out.

       

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                markap14 Mark Payne
                Reporter:
                markap14 Mark Payne
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 40m
                  40m