Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.5.0
    • Fix Version/s: 1.6.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      Add support for Kafka Source

      1. FLUME-2250.patch
        16 kB
        Frank Yao
      2. FLUME-2250-0.patch
        9 kB
        Ashish Paliwal
      3. FLUME-2250-1.patch
        8 kB
        Ashish Paliwal
      4. FLUME-2250-2.patch
        37 kB
        Gwen Shapira
      5. FLUME-2250-3.patch
        43 kB
        Gwen Shapira
      6. FLUME-2250-4.patch
        43 kB
        Gwen Shapira

        Issue Links

          Activity

          Hide
          gwenshap Gwen Shapira added a comment -

          Thank you Frank Yao - this is a great contribution for Kafka and all thanks to you

          Show
          gwenshap Gwen Shapira added a comment - Thank you Frank Yao - this is a great contribution for Kafka and all thanks to you
          Hide
          ybaniu Frank Yao added a comment -

          Thanks Ashish and Gwen!

          Show
          ybaniu Frank Yao added a comment - Thanks Ashish and Gwen!
          Hide
          hudson Hudson added a comment -

          UNSTABLE: Integrated in Flume-trunk-hbase-98 #19 (See https://builds.apache.org/job/Flume-trunk-hbase-98/19/)
          FLUME-2250. Kafka Source. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=0bbd0ad7aaad2f592c596608d4e1981aa86eb53d)

          • flume-ng-dist/pom.xml
          • flume-ng-sources/pom.xml
          • flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties
          • flume-ng-doc/sphinx/FlumeUserGuide.rst
          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
          • flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
          • flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
          • flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
          • flume-ng-sinks/flume-ng-kafka-sink/pom.xml
          • flume-ng-sources/flume-kafka-source/pom.xml
          • pom.xml
          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
          Show
          hudson Hudson added a comment - UNSTABLE: Integrated in Flume-trunk-hbase-98 #19 (See https://builds.apache.org/job/Flume-trunk-hbase-98/19/ ) FLUME-2250 . Kafka Source. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=0bbd0ad7aaad2f592c596608d4e1981aa86eb53d ) flume-ng-dist/pom.xml flume-ng-sources/pom.xml flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties flume-ng-doc/sphinx/FlumeUserGuide.rst flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java flume-ng-sinks/flume-ng-kafka-sink/pom.xml flume-ng-sources/flume-kafka-source/pom.xml pom.xml flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
          Hide
          hudson Hudson added a comment -

          UNSTABLE: Integrated in flume-trunk #659 (See https://builds.apache.org/job/flume-trunk/659/)
          FLUME-2250. Kafka Source. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=0bbd0ad7aaad2f592c596608d4e1981aa86eb53d)

          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
          • flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
          • flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties
          • flume-ng-sources/pom.xml
          • flume-ng-dist/pom.xml
          • pom.xml
          • flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
          • flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
          • flume-ng-doc/sphinx/FlumeUserGuide.rst
          • flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
          • flume-ng-sources/flume-kafka-source/pom.xml
          • flume-ng-sinks/flume-ng-kafka-sink/pom.xml
          Show
          hudson Hudson added a comment - UNSTABLE: Integrated in flume-trunk #659 (See https://builds.apache.org/job/flume-trunk/659/ ) FLUME-2250 . Kafka Source. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=0bbd0ad7aaad2f592c596608d4e1981aa86eb53d ) flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties flume-ng-sources/pom.xml flume-ng-dist/pom.xml pom.xml flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java flume-ng-doc/sphinx/FlumeUserGuide.rst flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java flume-ng-sources/flume-kafka-source/pom.xml flume-ng-sinks/flume-ng-kafka-sink/pom.xml
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Committed. Thanks Frank, Ashish and Gwen!

          Show
          hshreedharan Hari Shreedharan added a comment - Committed. Thanks Frank, Ashish and Gwen!
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 9cc8508255078b5a03f5b2899cc2795f376ad2b5 in flume's branch refs/heads/flume-1.6 from Hari Shreedharan
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=9cc8508 ]

          FLUME-2250. Kafka Source.

          (Frank Yao, Ashish Paliwal, Gwen Shapira via Hari)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 9cc8508255078b5a03f5b2899cc2795f376ad2b5 in flume's branch refs/heads/flume-1.6 from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=9cc8508 ] FLUME-2250 . Kafka Source. (Frank Yao, Ashish Paliwal, Gwen Shapira via Hari)
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 0bbd0ad7aaad2f592c596608d4e1981aa86eb53d in flume's branch refs/heads/trunk from Hari Shreedharan
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=0bbd0ad ]

          FLUME-2250. Kafka Source.

          (Frank Yao, Ashish Paliwal, Gwen Shapira via Hari)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 0bbd0ad7aaad2f592c596608d4e1981aa86eb53d in flume's branch refs/heads/trunk from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=0bbd0ad ] FLUME-2250 . Kafka Source. (Frank Yao, Ashish Paliwal, Gwen Shapira via Hari)
          Hide
          hshreedharan Hari Shreedharan added a comment -

          +1. I made a bunch of changes related to whitespaces and added a couple of isLogDebugEnabled. Running tests and committing.

          I have one comment wrt kafka.* parameters. They are out of place compared to the other parameters which are camel-cased. I will file a follow-up jira to change these params to camel case - the translation should be done within the component rather than have two different config conventions within one component. This is true for both Kafka Source and Sink.

          Show
          hshreedharan Hari Shreedharan added a comment - +1. I made a bunch of changes related to whitespaces and added a couple of isLogDebugEnabled. Running tests and committing. I have one comment wrt kafka.* parameters. They are out of place compared to the other parameters which are camel-cased. I will file a follow-up jira to change these params to camel case - the translation should be done within the component rather than have two different config conventions within one component. This is true for both Kafka Source and Sink.
          Hide
          gwenshap Gwen Shapira added a comment -

          Latest version - modified the sink pom and removed Kafka version. Both sink and source are getting version from parent pom now.

          Show
          gwenshap Gwen Shapira added a comment - Latest version - modified the sink pom and removed Kafka version. Both sink and source are getting version from parent pom now.
          Hide
          gwenshap Gwen Shapira added a comment -

          Many fixes according to Hari Shreedharan comments and few extra:

          • Using EventBuilder
          • Using isDebugEnabled
          • Skipping call to channel in case of no events.
          • Moved the creation of Kafka Consumer to start() method. Note that this means that if Flume can't connect to ZooKeeper (for any reason - from firewalls and typos), start will retry to connect every 3 second.
          • Tried to clean up the exceptions.
            One exception ("Failed to create message iterator") is still not very clear. As far as I can tell, creating iterator should never fail (and I've never seen it fail). The exception is there "just in case", so I don't have useful details to add.
          • fixed issue with hasData
          • chopped up lines
          • Improved comments, especially around the Kafka bits.
          • Fixed versions in pom
          • Parameters that will be passed to Kafka should start with "kafka" (i.e. "kafka.zookeeper.connect" rather than "zookeeper.connect"), this makes the behavior of Kafka source and sink identical, which is easier for users.
          • Added tests for non-existing topics (no errors, just lack of messages) and non-existing zookeeper (exception thrown by start() )
          Show
          gwenshap Gwen Shapira added a comment - Many fixes according to Hari Shreedharan comments and few extra: Using EventBuilder Using isDebugEnabled Skipping call to channel in case of no events. Moved the creation of Kafka Consumer to start() method. Note that this means that if Flume can't connect to ZooKeeper (for any reason - from firewalls and typos), start will retry to connect every 3 second. Tried to clean up the exceptions. One exception ("Failed to create message iterator") is still not very clear. As far as I can tell, creating iterator should never fail (and I've never seen it fail). The exception is there "just in case", so I don't have useful details to add. fixed issue with hasData chopped up lines Improved comments, especially around the Kafka bits. Fixed versions in pom Parameters that will be passed to Kafka should start with "kafka" (i.e. "kafka.zookeeper.connect" rather than "zookeeper.connect"), this makes the behavior of Kafka source and sink identical, which is easier for users. Added tests for non-existing topics (no errors, just lack of messages) and non-existing zookeeper (exception thrown by start() )
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Thanks for the info Gwen!

          Show
          hshreedharan Hari Shreedharan added a comment - Thanks for the info Gwen!
          Hide
          gwenshap Gwen Shapira added a comment -

          Kafka's AutoCommit:

          If autocommit is set to true, Kafka will commit offset for messages in a background thread every 10 seconds (IIRC the default).
          This means that if Flume is unable to write to channel - messages that we consumed may be lost when autocommit happens.
          It also means that if Flume agent crashes, we don't know if we lost messages in the buffer that were not written to channel yet, or if we will read the same messages twice because autocommit did not happen.

          If autocommit is disabled, the Kafka Source will commit on every batch. This can slow down ingest rates if the batches are small, but is far safer option.

          We recommend autocommit=false. I'll make sure we document this.

          Show
          gwenshap Gwen Shapira added a comment - Kafka's AutoCommit: If autocommit is set to true, Kafka will commit offset for messages in a background thread every 10 seconds (IIRC the default). This means that if Flume is unable to write to channel - messages that we consumed may be lost when autocommit happens. It also means that if Flume agent crashes, we don't know if we lost messages in the buffer that were not written to channel yet, or if we will read the same messages twice because autocommit did not happen. If autocommit is disabled, the Kafka Source will commit on every batch. This can slow down ingest rates if the batches are small, but is far safer option. We recommend autocommit=false. I'll make sure we document this.
          Hide
          gwenshap Gwen Shapira added a comment -

          Regarding timedHasNext():

          Kafka's API is slightly wonky.

          You set a "consumer timeout". hasNext() will block until the timeout (indefinitely by default), and will throw an exception when the timeout is reached. So the only possible return value is "true".
          This is fine when we consume an event at a time, but a problem when trying to consume batches (where we may block on each event).
          So I'm wrapping the Kafka API with behavior that we can use for batching.

          Show
          gwenshap Gwen Shapira added a comment - Regarding timedHasNext(): Kafka's API is slightly wonky. You set a "consumer timeout". hasNext() will block until the timeout (indefinitely by default), and will throw an exception when the timeout is reached. So the only possible return value is "true". This is fine when we consume an event at a time, but a problem when trying to consume batches (where we may block on each event). So I'm wrapping the Kafka API with behavior that we can use for batching.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          In the next iteration of the patch, could you also please add the Kafka dependency to the top-level pom, and have this and the kafka sink not specify the version - so it is pulled in from the top-level pom

          Show
          hshreedharan Hari Shreedharan added a comment - In the next iteration of the patch, could you also please add the Kafka dependency to the top-level pom, and have this and the kafka sink not specify the version - so it is pulled in from the top-level pom
          Hide
          hshreedharan Hari Shreedharan added a comment -

          If autocommit is true and processEventBatch fails, what happens? Is the data never ingested? What would be the semantics of such a failure?

          Show
          hshreedharan Hari Shreedharan added a comment - If autocommit is true and processEventBatch fails, what happens? Is the data never ingested? What would be the semantics of such a failure?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          I took a look. Looks mostly good, some comments:

          • Please use EventBuilder.withBody method to create the event instead of using the SimpleEvent class directly.
          • log.debug("Message: {}", new String(bytes)); -> should go in if (log.isDebugEnabled())
          • If no events were received from Kafka, getChannelProcessor().processEventBatch(eventList); call is a waste. So you probably want to check for the list size before this call.
          • This code should go into start() method, not configure:
                try {
                  this.consumer = KafkaSourceUtil.getConsumer(context);
                } catch (IOException e) {
                  log.error("IOException occur, {}", e.getMessage());
                } catch (InterruptedException e) {
                  log.error("InterruptedException occur, {}", e.getMessage());
                }
                Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(topic, new Integer(1));
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
                if(consumerMap == null) {
                  throw new ConfigurationException("topicCountMap is null");
                }
                List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
                if(topicList == null || topicList.isEmpty()) {
                  throw new ConfigurationException("topicList is null or empty");
                }
                KafkaStream<byte[], byte[]> stream =  topicList.get(0);
                it = stream.iterator();
            
          • Also, those exceptions don't give any details on what they mean. Is this because the config is bad, is it because the topics don't exist in Kafka etc? Basically configure method is called exactly once - so if this is an error that cannot be fixed automatically and requires a reload of the config - it is fine to keep it here, but if a retry can fix it this code should be in start() which gets called until the component is started.
          • Question about this method:
              IterStatus timedHasNext() {
                try {
                  long startTime = System.currentTimeMillis();
                  it.hasNext();
                  long endTime = System.currentTimeMillis();
                  return new IterStatus(true,endTime-startTime);
                } catch (ConsumerTimeoutException e) {
                  return new IterStatus(false,consumerTimeout);
                }
              }
            

            Shouldn't you be checking for the output of it.hasNext? Or will hasNext return only if there is hasNext is true?

          • This method is never used: public boolean hasData();
          • Couple of nits: May lines are > 80 chars, in many places no new line after , - most IDEs can fix it automatically for you.
          • Can you add some docs on what some of the Kafka specific methods so - so it is possible for people to understand later.
          Show
          hshreedharan Hari Shreedharan added a comment - I took a look. Looks mostly good, some comments: Please use EventBuilder.withBody method to create the event instead of using the SimpleEvent class directly. log.debug("Message: {}", new String(bytes)); -> should go in if (log.isDebugEnabled()) If no events were received from Kafka, getChannelProcessor().processEventBatch(eventList); call is a waste. So you probably want to check for the list size before this call. This code should go into start() method, not configure: try { this .consumer = KafkaSourceUtil.getConsumer(context); } catch (IOException e) { log.error( "IOException occur, {}" , e.getMessage()); } catch (InterruptedException e) { log.error( "InterruptedException occur, {}" , e.getMessage()); } Map< String , Integer > topicCountMap = new HashMap< String , Integer >(); topicCountMap.put(topic, new Integer (1)); Map< String , List<KafkaStream< byte [], byte []>>> consumerMap = consumer.createMessageStreams(topicCountMap); if (consumerMap == null ) { throw new ConfigurationException( "topicCountMap is null " ); } List<KafkaStream< byte [], byte []>> topicList = consumerMap.get(topic); if (topicList == null || topicList.isEmpty()) { throw new ConfigurationException( "topicList is null or empty" ); } KafkaStream< byte [], byte []> stream = topicList.get(0); it = stream.iterator(); Also, those exceptions don't give any details on what they mean. Is this because the config is bad, is it because the topics don't exist in Kafka etc? Basically configure method is called exactly once - so if this is an error that cannot be fixed automatically and requires a reload of the config - it is fine to keep it here, but if a retry can fix it this code should be in start() which gets called until the component is started. Question about this method: IterStatus timedHasNext() { try { long startTime = System .currentTimeMillis(); it.hasNext(); long endTime = System .currentTimeMillis(); return new IterStatus( true ,endTime-startTime); } catch (ConsumerTimeoutException e) { return new IterStatus( false ,consumerTimeout); } } Shouldn't you be checking for the output of it.hasNext? Or will hasNext return only if there is hasNext is true? This method is never used: public boolean hasData(); Couple of nits: May lines are > 80 chars, in many places no new line after , - most IDEs can fix it automatically for you. Can you add some docs on what some of the Kafka specific methods so - so it is possible for people to understand later.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Sorry - I didn't notice that the first patch was actually attached by the original author.

          Show
          hshreedharan Hari Shreedharan added a comment - Sorry - I didn't notice that the first patch was actually attached by the original author.
          Hide
          ybaniu Frank Yao added a comment -

          Hi Shapira and other guys,

          Thanks for your work on my JIRA and I really appreciate some body can make it a better patch. I think you can create a new ticket for flume-ng-kafka-0.8.

          Show
          ybaniu Frank Yao added a comment - Hi Shapira and other guys, Thanks for your work on my JIRA and I really appreciate some body can make it a better patch . I think you can create a new ticket for flume-ng-kafka-0.8.
          Hide
          jarcec Jarek Jarcec Cecho added a comment -

          Good point, thank you for raising the concern about legality Hari Shreedharan! I can second Gwen Shapira sentiment. Contributors are agreeing with including their work in ASF software by attaching their patches on JIRA - commit to a repository is not necessary from a legal perspective. Hence, this situation when one contributor took uploaded patch of another contributor to finish the work is completely fine and in fact has happened several times through my time at Apache.

          Show
          jarcec Jarek Jarcec Cecho added a comment - Good point, thank you for raising the concern about legality Hari Shreedharan ! I can second Gwen Shapira sentiment. Contributors are agreeing with including their work in ASF software by attaching their patches on JIRA - commit to a repository is not necessary from a legal perspective. Hence, this situation when one contributor took uploaded patch of another contributor to finish the work is completely fine and in fact has happened several times through my time at Apache.
          Hide
          gwenshap Gwen Shapira added a comment -

          Hari Shreedharan

          The first 3 attachments to this patch (dating Nov/Dec 2013) are contributions to the apache project by Frank Yao.
          My patch modified the existing patches to support a new version, different method of unit testing (miniclusters instead of mocks), batching and manual commits.

          To best of my understanding, the transfer of ownership to ASF happened when the patch was attached to the Jira. Since the original patch was uploaded here, it was contributed to the ASF, and therefore I can make changes on top and contribute it again.

          Show
          gwenshap Gwen Shapira added a comment - Hari Shreedharan The first 3 attachments to this patch (dating Nov/Dec 2013) are contributions to the apache project by Frank Yao . My patch modified the existing patches to support a new version, different method of unit testing (miniclusters instead of mocks), batching and manual commits. To best of my understanding, the transfer of ownership to ASF happened when the patch was attached to the Jira. Since the original patch was uploaded here, it was contributed to the ASF, and therefore I can make changes on top and contribute it again.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Hi Gwen:

          Can you confirm that it is ok to include this patch in ASF? Theoretically, the original author of the patch (whoever owns the original code) must be the one submitting it, before we can commit this. So if this has to be committed, you have to be the original author of this patch.

          Show
          hshreedharan Hari Shreedharan added a comment - Hi Gwen: Can you confirm that it is ok to include this patch in ASF? Theoretically, the original author of the patch (whoever owns the original code) must be the one submitting it, before we can commit this. So if this has to be committed, you have to be the original author of this patch.
          Hide
          gwenshap Gwen Shapira added a comment -

          pinging Hari Shreedharan for review

          Show
          gwenshap Gwen Shapira added a comment - pinging Hari Shreedharan for review
          Hide
          gwenshap Gwen Shapira added a comment -

          New Kafka-Source patch. Features:

          • Tested with 0.8.1.1
          • Can batch writes to channel, for improved performance
          • Supports both Kafka's auto commit, or commit for each batch, to eliminate risk for data loss
          Show
          gwenshap Gwen Shapira added a comment - New Kafka-Source patch. Features: Tested with 0.8.1.1 Can batch writes to channel, for improved performance Supports both Kafka's auto commit, or commit for each batch, to eliminate risk for data loss
          Hide
          gwenshap Gwen Shapira added a comment -

          Hi Frank Yao,

          I created an 0.8.1.1 version of the patch, tested it and added a unit-test.
          Mind if I upload it here?

          Show
          gwenshap Gwen Shapira added a comment - Hi Frank Yao , I created an 0.8.1.1 version of the patch, tested it and added a unit-test. Mind if I upload it here?
          Hide
          ybaniu Frank Yao added a comment -

          code is here https://github.com/baniuyao/flume-ng-kafka-source/tree/kafka_0.8.1

          I'll make that to a patch these days. As comparison to write codes, make patches and tests is a little hard.

          Show
          ybaniu Frank Yao added a comment - code is here https://github.com/baniuyao/flume-ng-kafka-source/tree/kafka_0.8.1 I'll make that to a patch these days. As comparison to write codes, make patches and tests is a little hard.
          Hide
          gwenshap Gwen Shapira added a comment -

          Hey Frank Yao,

          We really want this patch in trunk
          If you are busy, is this ok if I'll create the patch, and you'll just submit it to satisfy Apache's copyright assignment requirement?

          Show
          gwenshap Gwen Shapira added a comment - Hey Frank Yao , We really want this patch in trunk If you are busy, is this ok if I'll create the patch, and you'll just submit it to satisfy Apache's copyright assignment requirement?
          Hide
          gwenshap Gwen Shapira added a comment -

          Frank Yao can you submit your recent 0.8 branch of flume-kafka source as a patch to this JIRA?

          Show
          gwenshap Gwen Shapira added a comment - Frank Yao can you submit your recent 0.8 branch of flume-kafka source as a patch to this JIRA?
          Hide
          StevenLeRoux Steven Le Roux added a comment -

          Hi,

          Here is a fork from the proposed patches which bring support for Kafka 0.8.x : https://github.com/AlexYangYu/flume-ng-kafka-source (branch Kafka-0.8)

          Show
          StevenLeRoux Steven Le Roux added a comment - Hi, Here is a fork from the proposed patches which bring support for Kafka 0.8.x : https://github.com/AlexYangYu/flume-ng-kafka-source (branch Kafka-0.8)
          Hide
          damien.claveau Damien Claveau added a comment -

          Hi,
          I have seen that the issues pointed by the reviewer have been fixed.
          Does this mean that this source module will be soon included in the roadmap of Flume ?
          It is a very valuable addition.

          Show
          damien.claveau Damien Claveau added a comment - Hi, I have seen that the issues pointed by the reviewer have been fixed. Does this mean that this source module will be soon included in the roadmap of Flume ? It is a very valuable addition.
          Hide
          ybaniu Frank Yao added a comment -

          Done. Plz check

          Show
          ybaniu Frank Yao added a comment - Done. Plz check
          Hide
          paliwalashish Ashish Paliwal added a comment -

          Please open a review request via https://reviews.apache.org/dashboard/

          Click on New Review Request, select Repository as flume-git, choose diff file. create the request and fill details on next page. When ready, publish for review. Once done add the review link to the JIRA.

          Show
          paliwalashish Ashish Paliwal added a comment - Please open a review request via https://reviews.apache.org/dashboard/ Click on New Review Request, select Repository as flume-git, choose diff file. create the request and fill details on next page. When ready, publish for review. Once done add the review link to the JIRA.
          Hide
          ybaniu Frank Yao added a comment -

          Patch attached. Plz review, thanks.

          Show
          ybaniu Frank Yao added a comment - Patch attached. Plz review, thanks.
          Hide
          ybaniu Frank Yao added a comment -

          Source Test cases are okay. I use JUnit and JMock, is that okay?

          Show
          ybaniu Frank Yao added a comment - Source Test cases are okay. I use JUnit and JMock, is that okay?
          Hide
          paliwalashish Ashish Paliwal added a comment -

          Reverted to Kafka 0.7.2, minor fixes. Test cases still pending.

          Show
          paliwalashish Ashish Paliwal added a comment - Reverted to Kafka 0.7.2, minor fixes. Test cases still pending.
          Hide
          paliwalashish Ashish Paliwal added a comment - - edited

          Not a final patch. Used Kafka 0.8 version, and it seems a lot more work would be needed to make it work as expected with Kafka 0.8.
          This is just to base further work on it. Test cases are still missing, need to add them.

          Show
          paliwalashish Ashish Paliwal added a comment - - edited Not a final patch. Used Kafka 0.8 version, and it seems a lot more work would be needed to make it work as expected with Kafka 0.8. This is just to base further work on it. Test cases are still missing, need to add them.

            People

            • Assignee:
              gwenshap Gwen Shapira
              Reporter:
              paliwalashish Ashish Paliwal
            • Votes:
              4 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development