Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2852

Kafka Source/Sink should optionally read/write Flume records

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: v1.6.0
    • Fix Version/s: v1.7.0
    • Component/s: Sinks+Sources
    • Labels:
      None
    • Release Note:
      Added ability to preserve event headers when using Kafka Sink and Kafka Source together by using Avro serialisation (useFlumeEventFormat property).

      Description

      Currently the Kafka Sink only writes the event body to Kafka rather than an Avro Datum. This works fine when being used with a Kafka Source, or when being used with Kafka Channel, however it does mean that any Flume headers are lost when transported via Kafka.

      Request is to implement an equivalent of the Kafka Channel's parseAsFlumeEvent parameter to the sink/source so that optionally Avro Datums can be read from and written to Kafka.

      1. FLUME-2852.patch
        34 kB
        Tristan Stevens
      2. FLUME-2852v4.patch
        28 kB
        Tristan Stevens
      3. FLUME-2852v5.patch
        29 kB
        Tristan Stevens

        Issue Links

          Activity

          Hide
          jholoman Jeff Holoman added a comment -

          Just want to make sure I understand the issue as I'm working on all of the Kafka components as part of FLUME-2820.

          Are you suggesting to write Flume's AvroFlumeEvent as the message? Would you propose to persist the schema in the header? Can you help me understand your flow?

          Show
          jholoman Jeff Holoman added a comment - Just want to make sure I understand the issue as I'm working on all of the Kafka components as part of FLUME-2820 . Are you suggesting to write Flume's AvroFlumeEvent as the message? Would you propose to persist the schema in the header? Can you help me understand your flow?
          Hide
          tmgstev Tristan Stevens added a comment -

          Yes, this would absolutely be the AvroFlumEvent and would be without the schema as in the Avro Sink / Avro Source pairing.

          Here is the code from the Kafka Channel:

          if (parseAsFlumeEvent) {
          if (!tempOutStream.isPresent())

          { tempOutStream = Optional.of(new ByteArrayOutputStream()); }

          if (!writer.isPresent())

          { writer = Optional.of(new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); }

          tempOutStream.get().reset();
          AvroFlumeEvent e = new AvroFlumeEvent(
          toCharSeqMap(event.getHeaders()),
          ByteBuffer.wrap(event.getBody()));
          encoder = EncoderFactory.get()
          .directBinaryEncoder(tempOutStream.get(), encoder);
          writer.get().write(e, encoder);
          // Not really possible to avoid this copy
          serializedEvents.get().add(tempOutStream.get().toByteArray());
          } else

          { serializedEvents.get().add(event.getBody()); }

          The flow in this case is Syslog Source -> Memory Channel -> Kafka Sink -> Kafka Broker -> Kafka Source -> Memory Channel -> HDFS Sink

          Although in the future I'd like to make it: Syslog Source -> Kafka Channel -> Kafka Sink -> Kafka Broker -> Kafka Source -> Kafka Channel -> HDFS Sink

          N.B. The three tiers run in different sites.

          Show
          tmgstev Tristan Stevens added a comment - Yes, this would absolutely be the AvroFlumEvent and would be without the schema as in the Avro Sink / Avro Source pairing. Here is the code from the Kafka Channel: if (parseAsFlumeEvent) { if (!tempOutStream.isPresent()) { tempOutStream = Optional.of(new ByteArrayOutputStream()); } if (!writer.isPresent()) { writer = Optional.of(new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); } tempOutStream.get().reset(); AvroFlumeEvent e = new AvroFlumeEvent( toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody())); encoder = EncoderFactory.get() .directBinaryEncoder(tempOutStream.get(), encoder); writer.get().write(e, encoder); // Not really possible to avoid this copy serializedEvents.get().add(tempOutStream.get().toByteArray()); } else { serializedEvents.get().add(event.getBody()); } The flow in this case is Syslog Source -> Memory Channel -> Kafka Sink -> Kafka Broker -> Kafka Source -> Memory Channel -> HDFS Sink Although in the future I'd like to make it: Syslog Source -> Kafka Channel -> Kafka Sink -> Kafka Broker -> Kafka Source -> Kafka Channel -> HDFS Sink N.B. The three tiers run in different sites.
          Hide
          gherreros Gonzalo Herreros added a comment -

          Why don't you use a Avro Sink and Source to communicate between the tiers?

          I think what you are proposing would expose an internal object in an external interfaces: sources/sinks.
          I wonder however why the kafka headers are not used to pass event headers

          Show
          gherreros Gonzalo Herreros added a comment - Why don't you use a Avro Sink and Source to communicate between the tiers? I think what you are proposing would expose an internal object in an external interfaces: sources/sinks. I wonder however why the kafka headers are not used to pass event headers
          Hide
          jholoman Jeff Holoman added a comment -

          Tristan,
          This seems like a reasonable feature request to me. Gonzalo Herreros, Kafka doesn't have the concept of headers, beyond a single key. What Tristan is proposing is to extract the key if necessary, and encode the entire Event as an AvroFlumeEvent. In this way, headers could be persisted across clusters.

          I would suggest we hold off on the request, however, until we complete the update for Kafka 0.9.

          Show
          jholoman Jeff Holoman added a comment - Tristan, This seems like a reasonable feature request to me. Gonzalo Herreros , Kafka doesn't have the concept of headers, beyond a single key. What Tristan is proposing is to extract the key if necessary, and encode the entire Event as an AvroFlumeEvent. In this way, headers could be persisted across clusters. I would suggest we hold off on the request, however, until we complete the update for Kafka 0.9.
          Hide
          tmgstev Tristan Stevens added a comment -

          I'll take this one as and when FLUME-2820 is done, if that's okay?

          Show
          tmgstev Tristan Stevens added a comment - I'll take this one as and when FLUME-2820 is done, if that's okay?
          Hide
          jholoman Jeff Holoman added a comment -

          Ok by me

          Show
          jholoman Jeff Holoman added a comment - Ok by me
          Hide
          tmgstev Tristan Stevens added a comment -

          Patch based on uncommitted changes from FLUME-2820 (FLUME-2821, FLUME-2822, FLUME-2823)

          Show
          tmgstev Tristan Stevens added a comment - Patch based on uncommitted changes from FLUME-2820 ( FLUME-2821 , FLUME-2822 , FLUME-2823 )
          Hide
          tmgstev Tristan Stevens added a comment -

          Added unit tests and strengthened existing tests for regression testing. Also tested against Kafka 0.90 in a full end-to-end deployment.

          Show
          tmgstev Tristan Stevens added a comment - Added unit tests and strengthened existing tests for regression testing. Also tested against Kafka 0.90 in a full end-to-end deployment.
          Hide
          tmgstev Tristan Stevens added a comment -

          Attached patch from review board

          Show
          tmgstev Tristan Stevens added a comment - Attached patch from review board
          Hide
          tmgstev Tristan Stevens added a comment -

          Reformatted patch

          Show
          tmgstev Tristan Stevens added a comment - Reformatted patch
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 8d06a72d9dc660c28e1217891f9c5085b8192085 in flume's branch refs/heads/flume-1.7 from Jarek Jarcec Cecho
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=8d06a72 ]

          FLUME-2852: Kafka Source/Sink should optionally read/write Flume records

          (Tristan Stevens via Jarek Jarcec Cecho)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 8d06a72d9dc660c28e1217891f9c5085b8192085 in flume's branch refs/heads/flume-1.7 from Jarek Jarcec Cecho [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=8d06a72 ] FLUME-2852 : Kafka Source/Sink should optionally read/write Flume records (Tristan Stevens via Jarek Jarcec Cecho)
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 4eb2a3bb510671fe92cec0ebb61d7e78adc8f526 in flume's branch refs/heads/trunk from Jarek Jarcec Cecho
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=4eb2a3b ]

          FLUME-2852: Kafka Source/Sink should optionally read/write Flume records

          (Tristan Stevens via Jarek Jarcec Cecho)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 4eb2a3bb510671fe92cec0ebb61d7e78adc8f526 in flume's branch refs/heads/trunk from Jarek Jarcec Cecho [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=4eb2a3b ] FLUME-2852 : Kafka Source/Sink should optionally read/write Flume records (Tristan Stevens via Jarek Jarcec Cecho)
          Hide
          jarcec Jarek Jarcec Cecho added a comment -

          Thank you for your contribution Tristan Stevens!

          Show
          jarcec Jarek Jarcec Cecho added a comment - Thank you for your contribution Tristan Stevens !
          Hide
          hudson Hudson added a comment -

          FAILURE: Integrated in Flume-trunk-hbase-1 #155 (See https://builds.apache.org/job/Flume-trunk-hbase-1/155/)
          FLUME-2852: Kafka Source/Sink should optionally read/write Flume records (jarcec: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=4eb2a3bb510671fe92cec0ebb61d7e78adc8f526)

          • flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
          • flume-ng-doc/sphinx/FlumeUserGuide.rst
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
          • flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
          Show
          hudson Hudson added a comment - FAILURE: Integrated in Flume-trunk-hbase-1 #155 (See https://builds.apache.org/job/Flume-trunk-hbase-1/155/ ) FLUME-2852 : Kafka Source/Sink should optionally read/write Flume records (jarcec: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=4eb2a3bb510671fe92cec0ebb61d7e78adc8f526 ) flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java flume-ng-doc/sphinx/FlumeUserGuide.rst flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java

            People

            • Assignee:
              tmgstev Tristan Stevens
              Reporter:
              tmgstev Tristan Stevens
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development