Let me explain my motivations for doing it this way.
Some goals I had for adding general decorator/interceptor support were the following:
- Works consistently regardless of components in the system, i.e. custom Sources/Sinks
- No counter-intuitive behavior, i.e. it should be straightforward to write an Interceptor and it shouldn't be easy to inadvertently create bad side effects in the system
- Don't require changes to existing interfaces
Based on the above goals, the only place where these plugins really fit was the ChannelProcessor. The way it works, as you can see from the patch, is that a Source generates an Event and calls the ChannelProcessor.processEvent(Event e) method (or processEventBatch). The ChannelProcessor is responsible for opening a Transaction, putting the Event in the Channel, and closing the Transaction. For correctness reasons, the interception / transformation of the Events really has to take place inside of a Transaction. Also, from a fan-out perspective, if you are doing an operation on an Event that you want to replicate downstream, it makes sense to do that operation only once for the sake of efficiency. For all of these reasons, the ChannelProcessor is a good place to put Interceptor handling.
As you have rightly observed, there is no corresponding interceptor on the Sink side of the Channel for cases where we may only want to apply some transformation to the Events flowing to a given Sink. I did not add this yet for the following reasons:
- There is no corollary to ChannelProcessor on the Sink side. There is something called SinkProcessor, but that is basically a misnomer; it should really be called SinkPolicy or something. Each Sink is actually in charge of opening a Transaction, take()ing from the Channel, and committing the Transaction. Because the Inversion of Control that exists on the Source side does not exist on the Sink side, there is no clear place to plug in a driver for the Interceptor interface without requiring each Sink to implement it.
- Particularly in regard to MemoryChannel, the Interceptor concept is a little messy because if we modify the Events in-place in the Interceptors, which is safe to do on the Source side (Sources are not allowed to buffer Events), then we are modifying Events that may get returned to the Channel in a Transaction rollback(). If that happens then we could process the same event multiple times in the event of a failed Transaction. Obviously that would be wrong so the alternative is to defensively copy the Events before giving them to an Interceptor. That represents a potential performance concern that has to be considered carefully.
At this point in time, I believe that Source-side interceptors are a step in the right direction, and are more straightforward and less risky to implement than Sink-side decorators. With just these, it's possible to create another tier (logical or physical) to apply different Event processing to events going to different Sinks, if required, so there is a workaround.
Anyway, I share your concerns and I have been thinking a lot about how to make Sink-side decorators work. I'd like to come up with a solution that creates a basically symmetric system without breaking existing interfaces. Some of the other folks working on Flume are thinking about this use case also and I think we will come up with a workable solution soon.