Currently the counter of numRecordsIn is setup while processing input in processor. In order to integrate the processing logic based on StreamTaskInput#emitNext(Output) later, we need to pass the counter into output functions then. So there are three reasons to do this:
- It is the precondition of following integration work.
- We could make the counter as final fields in StreamInputProcessor and StreamTwoInputSelectableProcessor.
- We could reuse the counter setup logic for all the input processors.
There should be no side effects if we make the counter setup a bit earlier than the previous way.