Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-9822

Update ConsumeKafkaRecord to allow writing out of the Kafka record key



    • New Feature
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Extensions
    • None


      The ConsumeKafkaRecord processors are among the most commonly used in NiFi, as they provide a very efficient mechanism for consuming structured data from Kafka. The down side to these processors that they do not support writing out the Kafka record's key. This was done because we wanted to bundle the records' values together into a single FlowFile, and we didn't have a good way to include the key.

      For users who don't care about the key, this works great. For users that do need the key, they often are forced to use the non-record-oriented ConsumeKafka, which adds the key as an attribute. But this means that the key may need to be hex-encoded, which makes it less usable and it means that we are creating a FlowFile per kafka record.

      We should improve this by introducing some new properties to the ConsumeKafkaRecord processors:

      • Output Strategy. This property should have the following values:
          - Write Value Only - This should be the default value in order to maintain backward compatibility and should behave the same as it does now.
          - Use Wrapper - If selected, records that are provided to the Record Writer should be wrapped in a wrapper element that contains 4 keys: "key" (the kafka record key), "value" (the kafka record value), "headers" (a Map type of field with Strings as both the keys and values), and "metadata" (should include topic, partition, offset, timestamp, checksum).

      If the Output Strategy selected is "Use Wrapper", we should provide the following properties:
        - Key Format - Allowable Values of (String, Byte Array, Record)
        - Key Record Reader - if Key Format = "Record" then should allow specifying a Record Reader for the key. Should be dependent on Key Format = Record.

      Additionally, if the headers and the Kafka record key should only be added as attributes if using an Output Strategy of "Write Value Only." As a result, the following existing properties should be made dependent on using an Output Strategy of "Write Value Only":
        - Headers to Add as Attributes (Regex)
        - Key Attribute Encoding

      It will also be important to update the additionalDetails.html to explain the differences between the two output modes, and provide examples, including when one strategy should be preferred over the other.




            pgrey Paul Grey
            markap14 Mark Payne
            0 Vote for this issue
            2 Start watching this issue



              Time Tracking

                Original Estimate - Not Specified
                Not Specified
                Remaining Estimate - 0h
                Time Spent - 1h 10m
                1h 10m