Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.6.0
    • Component/s: None
    • Labels:
      None

      Description

      Here is the rationale:

      • Kafka does give a HA channel, which means a dead agent does not affect the data in the channel - thus reducing delay of delivery.
      • Kafka is used by many companies - it would be a good idea to use Flume to pull data from Kafka and write it to HDFS/HBase etc.

      This channel is not going to be useful for cases where Kafka is not already used, since it brings is operational overhead of maintaining two systems, but if there is Kafka in use - this is good way to integrate Kafka and Flume.

      Here is an a scratch implementation: https://github.com/harishreedharan/flume/blob/kafka-channel/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java

      1. FLUME-2500.patch
        48 kB
        Hari Shreedharan
      2. FLUME-2500.patch
        49 kB
        Hari Shreedharan
      3. FLUME-2500-1.patch
        49 kB
        Hari Shreedharan
      4. FLUME-2500-2.patch
        48 kB
        Hari Shreedharan

        Issue Links

          Activity

          Hide
          jarcec Jarek Jarcec Cecho added a comment -

          I like the idea of Kafka channel, so I'm +1 on the idea!

          Show
          jarcec Jarek Jarcec Cecho added a comment - I like the idea of Kafka channel, so I'm +1 on the idea!
          Hide
          roshan_naik Roshan Naik added a comment - - edited

          My thoughts ...

          • It seems better for clients to write directly to Kafka and bypass flume kaka channel all together in such a case.
          • Data flow seems unnecessarily complex ... data gets pushed out to a remote service when going from source -> kafka channel, then bought back to the local host when event flows from channel -> sink .. It seems a simpler data flow would be something like.... client -> kafka (via local flume agent usinga kafka sink) and then some subscriber which pull from Kafka
          • Kafka being a remote service, both flume sources & sinks will get coupled to intermittent failures when communicating with Kafka (sort of like the jdbc channel).
          Show
          roshan_naik Roshan Naik added a comment - - edited My thoughts ... It seems better for clients to write directly to Kafka and bypass flume kaka channel all together in such a case. Data flow seems unnecessarily complex ... data gets pushed out to a remote service when going from source -> kafka channel, then bought back to the local host when event flows from channel -> sink .. It seems a simpler data flow would be something like.... client -> kafka (via local flume agent usinga kafka sink) and then some subscriber which pull from Kafka Kafka being a remote service, both flume sources & sinks will get coupled to intermittent failures when communicating with Kafka (sort of like the jdbc channel).
          Hide
          hshreedharan Hari Shreedharan added a comment - - edited

          Roshan:

          There are 2 motivations for this channel:

          • Be able to provide a distributed channel for Flume - we could implement one internally in Flume - which I would definitely prefer but don't have the cycles to do, and is better operations-wise or use an existing service (like Hazel cast used by Ashish or Kafka in this case)
          • Given that a user has a Kafka cluster, be able to use Flume's sources and sinks to either get data into Kafka from the variety of Flume sources, or write to HDFS from Kafka using Flume's sinks. The reason a channel works better than the Kafka source is that a dead flume agent won't affect event delivery - the events just get routed via another agent.
          Show
          hshreedharan Hari Shreedharan added a comment - - edited Roshan: There are 2 motivations for this channel: Be able to provide a distributed channel for Flume - we could implement one internally in Flume - which I would definitely prefer but don't have the cycles to do, and is better operations-wise or use an existing service (like Hazel cast used by Ashish or Kafka in this case) Given that a user has a Kafka cluster, be able to use Flume's sources and sinks to either get data into Kafka from the variety of Flume sources, or write to HDFS from Kafka using Flume's sinks. The reason a channel works better than the Kafka source is that a dead flume agent won't affect event delivery - the events just get routed via another agent.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Initial implementation, with tests and supporting ingest from Kafka which may not have been inserted via Flume.

          Show
          hshreedharan Hari Shreedharan added a comment - Initial implementation, with tests and supporting ingest from Kafka which may not have been inserted via Flume. Ran tests in loops on travis - https://travis-ci.org/harishreedharan/flume/builds/38093100
          Hide
          roshan_naik Roshan Naik added a comment - - edited

          I see.. the distributed channel makes for an interesting new use case.

          I assume this implies the following topology ? :

          [ app server -> local flume agent -> flume with kafka channel -> destination ]

          That does brings up the question as to why would introduce a flume tier for such use cases .. instead of directly writing to Kafka.

          Basically whatever (a flume agent or something else) is delivering the data to a flume agent with a Kafka channel, could instead be writing to Kafka directly. Basically this topology ...

          [ app server -> local flume agent -> kafka -> flume -> destination ]

          or alternatively

          [ app server -> Kafka -> flume -> destination ]

          Effectively Kafka would function as distributed channel / aggregation layer. Do you see any disadvantages of these topologies ?

          Show
          roshan_naik Roshan Naik added a comment - - edited I see.. the distributed channel makes for an interesting new use case. I assume this implies the following topology ? : [ app server -> local flume agent -> flume with kafka channel -> destination ] That does brings up the question as to why would introduce a flume tier for such use cases .. instead of directly writing to Kafka. Basically whatever (a flume agent or something else) is delivering the data to a flume agent with a Kafka channel, could instead be writing to Kafka directly. Basically this topology ... [ app server -> local flume agent -> kafka -> flume -> destination ] or alternatively [ app server -> Kafka -> flume -> destination ] Effectively Kafka would function as distributed channel / aggregation layer. Do you see any disadvantages of these topologies ?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Agreed, that pretty much sums it up.

          The reason you'd write to Flume is because Flume's API is much easier to use and much less complex than Kafka's. Also, Kafka API changes between major releases and this shields the user from that. (This is my opinion, others might disagree - having written code against Kafka, I get this feeling).

          Flume is also much more effective at aggregating from a large number of producers - and just a handful of consumers, especially when sending data over a cross-data center link.
          So you'd have a [ large number of producers] -> [a few flume agents using kafka channel] -> hdfs.

          I expect both these to pop up.

          Show
          hshreedharan Hari Shreedharan added a comment - Agreed, that pretty much sums it up. The reason you'd write to Flume is because Flume's API is much easier to use and much less complex than Kafka's. Also, Kafka API changes between major releases and this shields the user from that. (This is my opinion, others might disagree - having written code against Kafka, I get this feeling). Flume is also much more effective at aggregating from a large number of producers - and just a handful of consumers, especially when sending data over a cross-data center link. So you'd have a [ large number of producers] -> [a few flume agents using kafka channel] -> hdfs. I expect both these to pop up.
          Hide
          jarcec Jarek Jarcec Cecho added a comment -

          Could you also post the patch to review board Hari Shreedharan?

          Show
          jarcec Jarek Jarcec Cecho added a comment - Could you also post the patch to review board Hari Shreedharan ?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Latest patch - on rb.

          Show
          hshreedharan Hari Shreedharan added a comment - Latest patch - on rb.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          New patch from RB

          Show
          hshreedharan Hari Shreedharan added a comment - New patch from RB
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Final patch from RB

          Show
          hshreedharan Hari Shreedharan added a comment - Final patch from RB
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit aef02df10a26a6b6911b771a506994f2069857cf in flume's branch refs/heads/trunk from Jarek Jarcec Cecho
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=aef02df ]

          FLUME-2500: Add a channel that uses Kafka

          (Hari Shreedharan via Jarek Jarcec Cecho)

          Show
          jira-bot ASF subversion and git services added a comment - Commit aef02df10a26a6b6911b771a506994f2069857cf in flume's branch refs/heads/trunk from Jarek Jarcec Cecho [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=aef02df ] FLUME-2500 : Add a channel that uses Kafka (Hari Shreedharan via Jarek Jarcec Cecho)
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 832594a293dd9f4fc3d45164215aa776bea2c1f9 in flume's branch refs/heads/flume-1.6 from Jarek Jarcec Cecho
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=832594a ]

          FLUME-2500: Add a channel that uses Kafka

          (Hari Shreedharan via Jarek Jarcec Cecho)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 832594a293dd9f4fc3d45164215aa776bea2c1f9 in flume's branch refs/heads/flume-1.6 from Jarek Jarcec Cecho [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=832594a ] FLUME-2500 : Add a channel that uses Kafka (Hari Shreedharan via Jarek Jarcec Cecho)
          Hide
          jarcec Jarek Jarcec Cecho added a comment -

          I've just committed the change, thank you Hari Shreedharan for the contribution and Gwen Shapira for the review!

          Show
          jarcec Jarek Jarcec Cecho added a comment - I've just committed the change, thank you Hari Shreedharan for the contribution and Gwen Shapira for the review!
          Hide
          hudson Hudson added a comment -

          UNSTABLE: Integrated in Flume-trunk-hbase-98 #42 (See https://builds.apache.org/job/Flume-trunk-hbase-98/42/)
          FLUME-2500: Add a channel that uses Kafka (jarcec: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=aef02df10a26a6b6911b771a506994f2069857cf)

          • flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties
          • flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties
          • flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
          • flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
          • flume-ng-sinks/flume-ng-kafka-sink/pom.xml
          • flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
          • flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
          • flume-ng-channels/flume-kafka-channel/pom.xml
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
          • flume-ng-channels/pom.xml
          Show
          hudson Hudson added a comment - UNSTABLE: Integrated in Flume-trunk-hbase-98 #42 (See https://builds.apache.org/job/Flume-trunk-hbase-98/42/ ) FLUME-2500 : Add a channel that uses Kafka (jarcec: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=aef02df10a26a6b6911b771a506994f2069857cf ) flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java flume-ng-sinks/flume-ng-kafka-sink/pom.xml flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java flume-ng-channels/flume-kafka-channel/pom.xml flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java flume-ng-channels/pom.xml
          Hide
          hudson Hudson added a comment -

          UNSTABLE: Integrated in flume-trunk #683 (See https://builds.apache.org/job/flume-trunk/683/)
          FLUME-2500: Add a channel that uses Kafka (jarcec: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=aef02df10a26a6b6911b771a506994f2069857cf)

          • flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties
          • flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
          • flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
          • flume-ng-channels/pom.xml
          • flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
          • flume-ng-sinks/flume-ng-kafka-sink/pom.xml
          • flume-ng-channels/flume-kafka-channel/pom.xml
          • flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
          • flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties
          Show
          hudson Hudson added a comment - UNSTABLE: Integrated in flume-trunk #683 (See https://builds.apache.org/job/flume-trunk/683/ ) FLUME-2500 : Add a channel that uses Kafka (jarcec: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=aef02df10a26a6b6911b771a506994f2069857cf ) flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java flume-ng-channels/pom.xml flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java flume-ng-sinks/flume-ng-kafka-sink/pom.xml flume-ng-channels/flume-kafka-channel/pom.xml flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties
          Hide
          litao1990 Tao Li added a comment -

          I have a question about this issue.
          Flume support transaction between source and channel. So for kafka channel:
          1. If we use "sync" kafka producer, it surely can guarantee the transaction, but sync send has low performance.
          2. If we use "async" kafka producer, the performance will be better, but it can't guarantee the transaction.

          How do you think on it?

          Show
          litao1990 Tao Li added a comment - I have a question about this issue. Flume support transaction between source and channel. So for kafka channel: 1. If we use "sync" kafka producer, it surely can guarantee the transaction, but sync send has low performance. 2. If we use "async" kafka producer, the performance will be better, but it can't guarantee the transaction. How do you think on it?

            People

            • Assignee:
              hshreedharan Hari Shreedharan
              Reporter:
              hshreedharan Hari Shreedharan
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development