We have many processors currently that store state using NiFi's built-in state management capabilities. To do this, processors need to do something like:
Which is not a terrible API but comes with a few downfalls.
If using a processor that has the @SupportsBatching annotation, calls to ProcessContext.getStateManager().getState(Scope) can be costly to invoke for each FlowFile. To avoid this, processors typically end up having to cache the values themselves.
Depending on the code, management of the state map can be difficult and there's no ability to rollback the state changes once applied because the call to setState() immediately updates the remote state.
If executing within a different context, in which we want to store state atomically with the FlowFiles that resulted in the state change, there's no way to do that currently.
To overcome these problems, we should allow for setting, getting, clearing, and replacing state to be done via the ProcessSession, in addition to the State Manager. I.e., a Processor developer may do either of:
The former would behave as it does now, immediately updating state on the remote system (zookeeper, for example). The latter would simply update an in-memory copy of the state in the Process Session. When ProcessSession.commit() is called, it would push the new state to the remote system. If the session is rolled back, it would simply not update the state. This allows the state to be set in the middle of the processor's algorithm, rather than requiring that it be held onto until after session commit is successful. If the session is then checkpointed (via session.commit while running with a Run Duration greater than 0 ms), then the Session Checkpoint will keep the state. Rolling back the session but not the checkpoint would then result in the checkpointed state still be pushed out.