Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2799

Kafka Source - Message Offset and Partition add to headers

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Implemented
    • Affects Version/s: v1.6.0
    • Fix Version/s: v1.7.0
    • Component/s: Sinks+Sources
    • Labels:
    • Flags:
      Patch

      Description

      Currently Kafka source only persists the original kafka message's topic into the Flume event headers.

      For downstream interceptors and sinks that may want to have available to them the partition and the offset , we need to add these.

      Also it is noted that the conversion from MessageAndMetaData to FlumeEvent is not configurable unlike other sources such as JMS.

      1. FLUME-2799-0.patch
        11 kB
        Michael Andre Pearce (IG)

        Activity

        Hide
        roshan_naik Roshan Naik added a comment -

        Gwen Shapira would you be able to review this ?

        Show
        roshan_naik Roshan Naik added a comment - Gwen Shapira would you be able to review this ?
        Hide
        roshan_naik Roshan Naik added a comment -

        My comments:

        • Needs update to documentation wrt Converter, converter config and custom converters.
        • The default converter here is applying 4 headers (2 old, plus 2 new ones). Adding headers to every event is expensive in terms of memory (and also some cpu due to added GC pressure). Which headers to apply should be user selectable with the default settings preserving existing behavior.
        • The need for introducing Convertor for adding additional headers may be a bit overkill, but acceptable.
        Show
        roshan_naik Roshan Naik added a comment - My comments: Needs update to documentation wrt Converter, converter config and custom converters. The default converter here is applying 4 headers (2 old, plus 2 new ones). Adding headers to every event is expensive in terms of memory (and also some cpu due to added GC pressure). Which headers to apply should be user selectable with the default settings preserving existing behavior. The need for introducing Convertor for adding additional headers may be a bit overkill, but acceptable.
        Hide
        jholoman Jeff Holoman added a comment -

        I haven't had a chance to look at the patch but it's worth noting that we are planning on revamping the source for Kafka 0.9.0. Perhaps it's best to hold off until those changes are in?

        Show
        jholoman Jeff Holoman added a comment - I haven't had a chance to look at the patch but it's worth noting that we are planning on revamping the source for Kafka 0.9.0. Perhaps it's best to hold off until those changes are in?
        Hide
        gwenshap Gwen Shapira added a comment -

        Do we really want Sources to have their own "pre-source" pluggable interceptor?

        I sympathize with the requirement, but perhaps something more "configurable" and less "coding" to manage the headers (a bit like what HDFS sink does for directory names, only the reverse). After all, simple and configurable sources are part of the big value in Flume.

        Show
        gwenshap Gwen Shapira added a comment - Do we really want Sources to have their own "pre-source" pluggable interceptor? I sympathize with the requirement, but perhaps something more "configurable" and less "coding" to manage the headers (a bit like what HDFS sink does for directory names, only the reverse). After all, simple and configurable sources are part of the big value in Flume.
        Hide
        michael.andre.pearce Michael Andre Pearce (IG) added a comment - - edited

        The core requirement here is that not all the metadata transformed / transferred to the FlumeEvent. So that interceptors and sinks can use the data.

        This is a blocking issue for the organisation I work for to implement flume in our workflows where we use Kafka. (I am also aware of another org).

        RE The Solution:
        The intent was to split consumption from transformation into separate entities, it keeps the two concerns separate.
        The design actually as per ticket, mimics that of the more mature JMS solution already in Flume. Its intent is not as an interceptor but to convert from source object event to FlumeEvent.
        Happy to merge the two, but I think it is cleaner to separate the transform from source system event to flume event and the consume code, and I think the solution in JMS in cleaner and neater.

        As for the number of headers this is far few headers / meta data than will come from sources such as JMS. Also to note, the data will already will be in memory as in MessageAndMetaData that we transform from.

        RE Support for Kafka 0.9.0
        This ticket has been open since sept, as patch is developed, and solves the issue and all existing tests pass I do not see any reason to hold off merging the functionality. Unless the new consumer is ready for next release with these features supported I do not see why one would hold off.

        Also need to be aware kafka 0.9.0 consumer has only just released and as such not many organisations will upgrade instantly, as such using the old consumer for flume for some period of time will be needed for compatibility. As the new consumer in 0.9.0 is not back compatible with older kafka brokers. But older consumers are compatible with the 0.9.0 broker.

        Show
        michael.andre.pearce Michael Andre Pearce (IG) added a comment - - edited The core requirement here is that not all the metadata transformed / transferred to the FlumeEvent. So that interceptors and sinks can use the data. This is a blocking issue for the organisation I work for to implement flume in our workflows where we use Kafka. (I am also aware of another org). RE The Solution: The intent was to split consumption from transformation into separate entities, it keeps the two concerns separate. The design actually as per ticket, mimics that of the more mature JMS solution already in Flume. Its intent is not as an interceptor but to convert from source object event to FlumeEvent. Happy to merge the two, but I think it is cleaner to separate the transform from source system event to flume event and the consume code, and I think the solution in JMS in cleaner and neater. As for the number of headers this is far few headers / meta data than will come from sources such as JMS. Also to note, the data will already will be in memory as in MessageAndMetaData that we transform from. RE Support for Kafka 0.9.0 This ticket has been open since sept, as patch is developed, and solves the issue and all existing tests pass I do not see any reason to hold off merging the functionality. Unless the new consumer is ready for next release with these features supported I do not see why one would hold off. Also need to be aware kafka 0.9.0 consumer has only just released and as such not many organisations will upgrade instantly, as such using the old consumer for flume for some period of time will be needed for compatibility. As the new consumer in 0.9.0 is not back compatible with older kafka brokers. But older consumers are compatible with the 0.9.0 broker.
        Hide
        roshan_naik Roshan Naik added a comment -
        • I don't think it is wise to block this requirement on Kafka 0.9. This ability seems useful in its own right.
        • Functionally, It does seem to overlap with notion of interceptors even if its not the intention.. JMS convertors, deal more with body and less with headers.
        • If each source implements its own converters. It is better to have a common reusable convertor system shared by others sources. Which can then bring into question the need for interceptors.
        • Although well motivated, It feels excessive to introduce convertors in this ticket which deals with merely adding couple headers.
        • My thoughts
          + Documentation needs update
          + Make the new headers optional (and disabled by default) so that existing users don
          't see any impact.
          + If you are willing to simplify it to do this without convertors it would make this a simpler review and require less debate. Unless you any other thoughts ?
        Show
        roshan_naik Roshan Naik added a comment - I don't think it is wise to block this requirement on Kafka 0.9. This ability seems useful in its own right. Functionally, It does seem to overlap with notion of interceptors even if its not the intention.. JMS convertors, deal more with body and less with headers. If each source implements its own converters. It is better to have a common reusable convertor system shared by others sources. Which can then bring into question the need for interceptors. Although well motivated, It feels excessive to introduce convertors in this ticket which deals with merely adding couple headers. My thoughts + Documentation needs update + Make the new headers optional (and disabled by default) so that existing users don 't see any impact. + If you are willing to simplify it to do this without convertors it would make this a simpler review and require less debate. Unless you any other thoughts ?
        Hide
        michael.andre.pearce Michael Andre Pearce (IG) added a comment - - edited

        Will look at updating patch with converter merged back in so a single class and option flag on the additional headers. So it reduces the change as you suggest.

        Please can you point me to where the doc is you want/ need updating.

        Cheers
        M

        Show
        michael.andre.pearce Michael Andre Pearce (IG) added a comment - - edited Will look at updating patch with converter merged back in so a single class and option flag on the additional headers. So it reduces the change as you suggest. Please can you point me to where the doc is you want/ need updating. Cheers M
        Hide
        roshan_naik Roshan Naik added a comment -

        In flume-ng-doc/sphinx/FlumeUserGuide.rst
        look for the Kafka source section.

        Show
        roshan_naik Roshan Naik added a comment - In flume-ng-doc/sphinx/FlumeUserGuide.rst look for the Kafka source section.
        Hide
        roshan_naik Roshan Naik added a comment -
        Show
        roshan_naik Roshan Naik added a comment - Michael Andre Pearce (IG) any update ?
        Hide
        liorze Lior Zeno added a comment -

        This is implemented in the new KafkaSource. See FLUME-2821.

        KafkaSource.java
        if (!headers.containsKey(KafkaSourceConstants.TIMESTAMP_HEADER)) {
          headers.put(KafkaSourceConstants.TIMESTAMP_HEADER,
              String.valueOf(System.currentTimeMillis()));
        }
        if (!headers.containsKey(KafkaSourceConstants.TOPIC_HEADER)) {
          headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic());
        }
        if (!headers.containsKey(KafkaSourceConstants.PARTITION_HEADER)) {
          headers.put(KafkaSourceConstants.PARTITION_HEADER,
              String.valueOf(message.partition()));
        }
        
        Show
        liorze Lior Zeno added a comment - This is implemented in the new KafkaSource. See FLUME-2821 . KafkaSource.java if (!headers.containsKey(KafkaSourceConstants.TIMESTAMP_HEADER)) { headers.put(KafkaSourceConstants.TIMESTAMP_HEADER, String .valueOf( System .currentTimeMillis())); } if (!headers.containsKey(KafkaSourceConstants.TOPIC_HEADER)) { headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic()); } if (!headers.containsKey(KafkaSourceConstants.PARTITION_HEADER)) { headers.put(KafkaSourceConstants.PARTITION_HEADER, String .valueOf(message.partition())); }

          People

          • Assignee:
            Unassigned
            Reporter:
            michael.andre.pearce Michael Andre Pearce (IG)
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 24h
              24h
              Remaining:
              Remaining Estimate - 24h
              24h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development