Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.4.0
    • Fix Version/s: 1.6.0
    • Component/s: Sinks+Sources
    • Labels:
    • Release Note:
      Kafka sink implementation for Kafak - 0.8.X series of releases.

      Description

      Add support for Kafka Sink

      1. FLUME-2251.patch
        30 kB
        Frank Yao
      2. FLUME-2251.patch
        14 kB
        Frank Yao
      3. FLUME-2251-0.patch
        0.4 kB
        Ashish Paliwal
      4. Flume-2251-1.patch
        56 kB
        Thilina Buddhika
      5. FLUME-2251-2.patch
        58 kB
        Johny Rufus
      6. FLUME-2251-3.patch
        69 kB
        Thilina Buddhika
      7. FLUME-2251-4.patch
        59 kB
        Gwen Shapira
      8. FLUME-2251-5.patch
        46 kB
        Gwen Shapira
      9. FLUME-2251-6.patch
        47 kB
        Gwen Shapira

        Issue Links

          Activity

          Hide
          paliwalashish Ashish Paliwal added a comment -

          Package changes, kept code with Kafka 0.7.2. Test cases are pending

          Show
          paliwalashish Ashish Paliwal added a comment - Package changes, kept code with Kafka 0.7.2. Test cases are pending
          Hide
          ybaniu Frank Yao added a comment -

          Patch attached. Plz review, thanks.

          Show
          ybaniu Frank Yao added a comment - Patch attached. Plz review, thanks.
          Hide
          damien.claveau Damien Claveau added a comment -

          Please, include this addition in the roadmap of Flume.
          It is a very valuable addition, since Apache Kafka is a natural companion of Flume and HDFS.
          I cannot explain why nobody else votes for this.

          Show
          damien.claveau Damien Claveau added a comment - Please, include this addition in the roadmap of Flume. It is a very valuable addition, since Apache Kafka is a natural companion of Flume and HDFS. I cannot explain why nobody else votes for this.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          [~ ybaniu] This looks like an excellent addition. I will review this in the coming week.

          Show
          hshreedharan Hari Shreedharan added a comment - [~ ybaniu] This looks like an excellent addition. I will review this in the coming week.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Here is an initial review of this patch:

          • please use tab size = 2 spaces (please don't use tab characters, but use tab size = 2 - most IDEs can convert tabs into a configurable number of spaces).
          • The ASF header used contains copyright to the author. Please use the exact header found in other files in Flume.
          • The sink logic seems a bit buggy. Looks like you are rolling back the transaction if the channel returns null. You rollback the transaction only when the channel throws an exception or the kafka producer fails. The channel will return null when there are no more events. Simply commit the transaction and return READY at that point.
          • you need to put a try catch around the rollback call too (since exceptions can be thrown from this too). Simply catch it and log it.
          Show
          hshreedharan Hari Shreedharan added a comment - Here is an initial review of this patch: please use tab size = 2 spaces (please don't use tab characters, but use tab size = 2 - most IDEs can convert tabs into a configurable number of spaces). The ASF header used contains copyright to the author. Please use the exact header found in other files in Flume. The sink logic seems a bit buggy. Looks like you are rolling back the transaction if the channel returns null. You rollback the transaction only when the channel throws an exception or the kafka producer fails. The channel will return null when there are no more events. Simply commit the transaction and return READY at that point. you need to put a try catch around the rollback call too (since exceptions can be thrown from this too). Simply catch it and log it.
          Hide
          damien.claveau Damien Claveau added a comment -

          Thank you Hari !
          Hope this will converge to a stable version in the Flume trunk soon.

          Show
          damien.claveau Damien Claveau added a comment - Thank you Hari ! Hope this will converge to a stable version in the Flume trunk soon.
          Hide
          paliwalashish Ashish Paliwal added a comment -

          Hari Shreedharan Kafka 0.7 is not in Maven repos, so we need to find a way out of this as well for the build process

          Show
          paliwalashish Ashish Paliwal added a comment - Hari Shreedharan Kafka 0.7 is not in Maven repos, so we need to find a way out of this as well for the build process
          Hide
          damien.claveau Damien Claveau added a comment -

          A pull request that upgrades to the stable release 0.8
          and solves this issue was done by another contributor here in the original poster repository :

          https://github.com/baniuyao/flume-ng-kafka-sink/pull/1

          How can one deal with this situation ? Can the patches be updated by someone ?

          Show
          damien.claveau Damien Claveau added a comment - A pull request that upgrades to the stable release 0.8 and solves this issue was done by another contributor here in the original poster repository : https://github.com/baniuyao/flume-ng-kafka-sink/pull/1 How can one deal with this situation ? Can the patches be updated by someone ?
          Hide
          ybaniu Frank Yao added a comment -

          thanks Hari Shreedharan, I'll fix this problems in this week. Kafka 0.8 is a little different with 0.7 and plus, I think 0.7 is more stable currently because of some new features in 0.8.

          Show
          ybaniu Frank Yao added a comment - thanks Hari Shreedharan, I'll fix this problems in this week. Kafka 0.8 is a little different with 0.7 and plus, I think 0.7 is more stable currently because of some new features in 0.8.
          Hide
          ybaniu Frank Yao added a comment -

          I've fix bugs and upload new diff

          Show
          ybaniu Frank Yao added a comment - I've fix bugs and upload new diff
          Hide
          damien.claveau Damien Claveau added a comment -

          Apache Kafka stable release is now 0.8.1.
          https://archive.apache.org/dist/kafka/0.8.1/RELEASE_NOTES.html

          Apache Camel has released the Camel 2.13.0,
          which includes now a Kafka component (Producer & Consumer) for the Kafka 0.8.1 stable release.
          https://github.com/apache/camel/blob/master/parent/pom.xml#L258

          I don't think we should get stuck in Flume with the now quite old 0.7.2 version of Kafka.
          Any thoughts ?

          Show
          damien.claveau Damien Claveau added a comment - Apache Kafka stable release is now 0.8.1. https://archive.apache.org/dist/kafka/0.8.1/RELEASE_NOTES.html Apache Camel has released the Camel 2.13.0, which includes now a Kafka component (Producer & Consumer) for the Kafka 0.8.1 stable release. https://github.com/apache/camel/blob/master/parent/pom.xml#L258 I don't think we should get stuck in Flume with the now quite old 0.7.2 version of Kafka. Any thoughts ?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Agreed. I think we should support both versions if possible, else we should go with the newer version.

          Show
          hshreedharan Hari Shreedharan added a comment - Agreed. I think we should support both versions if possible, else we should go with the newer version.
          Hide
          ybaniu Frank Yao added a comment -

          The difference between 0.7 and 0.8 in flume-plugin is small. I think we can support both version of Kafka while 0.7 is used in production of many places

          Show
          ybaniu Frank Yao added a comment - The difference between 0.7 and 0.8 in flume-plugin is small. I think we can support both version of Kafka while 0.7 is used in production of many places
          Hide
          StevenLeRoux Steven Le Roux added a comment -

          Hi,

          Here is a fork from the proposed patches which bring support for Kafka 0.8.x : https://github.com/AlexYangYu/flume-ng-kafka-sink (branch Kafka-0.8)

          Show
          StevenLeRoux Steven Le Roux added a comment - Hi, Here is a fork from the proposed patches which bring support for Kafka 0.8.x : https://github.com/AlexYangYu/flume-ng-kafka-sink (branch Kafka-0.8)
          Hide
          brocknoland Brock Noland added a comment -

          Hi,

          What's the status on this?

          Show
          brocknoland Brock Noland added a comment - Hi, What's the status on this?
          Hide
          ybaniu Frank Yao added a comment -

          I've worked it out for kafka 0.8. here it is: https://github.com/baniuyao/flume-ng-kafka-source/tree/kafka_0.8.1

          Show
          ybaniu Frank Yao added a comment - I've worked it out for kafka 0.8. here it is: https://github.com/baniuyao/flume-ng-kafka-source/tree/kafka_0.8.1
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Do you mind submitting this to the flume project by submitting a patch?

          Show
          hshreedharan Hari Shreedharan added a comment - Do you mind submitting this to the flume project by submitting a patch?
          Hide
          gwenshap Gwen Shapira added a comment -

          Frank Yao the link you posted here is for kafka source. This JIRA is for kafka sink...

          I found the following repository in the parent JIRA (FLUME-2242) with a kafka sink implementation for kafka 0.8: https://github.com/thilinamb/flume-ng-kafka-sink.

          I hope Thilina Buddhika will create a patch and submit in this JIRA.

          Show
          gwenshap Gwen Shapira added a comment - Frank Yao the link you posted here is for kafka source. This JIRA is for kafka sink... I found the following repository in the parent JIRA ( FLUME-2242 ) with a kafka sink implementation for kafka 0.8: https://github.com/thilinamb/flume-ng-kafka-sink . I hope Thilina Buddhika will create a patch and submit in this JIRA.
          Hide
          thilinamb Thilina Buddhika added a comment -

          Hi Gwen Shapira,

          I'd be happy to contribute it to Flume. Allow me a few days to roll out a patch.

          Thanks,
          Thilina

          Show
          thilinamb Thilina Buddhika added a comment - Hi Gwen Shapira , I'd be happy to contribute it to Flume. Allow me a few days to roll out a patch. Thanks, Thilina
          Hide
          ybaniu Frank Yao added a comment -

          Sorry Shapira, I've mixed those two up. This week is really busy and I'll submit a ticket of 0.8 source in another thread. In addition, it's really a good news that Buddhika made 0.8 sink for flume-ng. A tiny question is - I found a project named "kafka-client" in kafka source code directory and it can be built into a jar named "kafka-client.jar". It looks like a high-level capsuling of kafka 0.8 producer. But, which is tricky, this jar - kafka-client.jar - is not in maven central repo. I wanna know if Buddhika is using that?

          Show
          ybaniu Frank Yao added a comment - Sorry Shapira, I've mixed those two up. This week is really busy and I'll submit a ticket of 0.8 source in another thread. In addition, it's really a good news that Buddhika made 0.8 sink for flume-ng. A tiny question is - I found a project named "kafka-client" in kafka source code directory and it can be built into a jar named "kafka-client.jar". It looks like a high-level capsuling of kafka 0.8 producer. But, which is tricky, this jar - kafka-client.jar - is not in maven central repo. I wanna know if Buddhika is using that?
          Hide
          gwenshap Gwen Shapira added a comment -

          Thanks Frank Yao for contributing. We are looking forward to the patch so we can get it into trunk and people can stop confusing the different forks

          Show
          gwenshap Gwen Shapira added a comment - Thanks Frank Yao for contributing. We are looking forward to the patch so we can get it into trunk and people can stop confusing the different forks
          Hide
          gwenshap Gwen Shapira added a comment -

          Hey Thilina Buddhika,

          We really want this patch in trunk
          If you are busy, is this ok if I'll create the patch, and you'll just submit it to satisfy Apache's copyright assignment requirement?

          Show
          gwenshap Gwen Shapira added a comment - Hey Thilina Buddhika , We really want this patch in trunk If you are busy, is this ok if I'll create the patch, and you'll just submit it to satisfy Apache's copyright assignment requirement?
          Hide
          thilinamb Thilina Buddhika added a comment - - edited

          Hi Gwen Shapira,

          Sorry about the delay. I am half way through creating the patch. I can get it done by Saturday if that works for you. Otherwise please go ahead and create the patch.

          Thanks,
          Thilina

          Show
          thilinamb Thilina Buddhika added a comment - - edited Hi Gwen Shapira , Sorry about the delay. I am half way through creating the patch. I can get it done by Saturday if that works for you. Otherwise please go ahead and create the patch. Thanks, Thilina
          Hide
          gwenshap Gwen Shapira added a comment -

          Thilina Buddhika
          Saturday is perfect. Looking forward for your patch!

          Show
          gwenshap Gwen Shapira added a comment - Thilina Buddhika Saturday is perfect. Looking forward for your patch!
          Hide
          thilinamb Thilina Buddhika added a comment -

          Please find the patch containing a sink implementation for Kafka - 0.8.X. This is the implementation available at https://github.com/thilinamb/flume-ng-kafka-sink ported to Flume trunk.

          Show
          thilinamb Thilina Buddhika added a comment - Please find the patch containing a sink implementation for Kafka - 0.8.X. This is the implementation available at https://github.com/thilinamb/flume-ng-kafka-sink ported to Flume trunk.
          Hide
          thilinamb Thilina Buddhika added a comment -

          Kafa Sink implementation (for Kafka 0.8.X) + User guide update.

          Show
          thilinamb Thilina Buddhika added a comment - Kafa Sink implementation (for Kafka 0.8.X) + User guide update.
          Hide
          gwenshap Gwen Shapira added a comment -

          Thank you Thilina Buddhika! I'm testing it out

          Show
          gwenshap Gwen Shapira added a comment - Thank you Thilina Buddhika ! I'm testing it out
          Hide
          thilinamb Thilina Buddhika added a comment -

          Thanks Gwen Shapira.. Let me know if I can help further on this regard.

          Show
          thilinamb Thilina Buddhika added a comment - Thanks Gwen Shapira .. Let me know if I can help further on this regard.
          Hide
          jrufus Johny Rufus added a comment - - edited

          This patch includes a few necessary pom.xml changes and some javadoc changes on top of the patch FLUME-2251-1.patch submitted by Thilina.
          A couple of issues not included in this patch, but we will track as separate Jiras :
          1) FLUME-2454: Support batchSize to allow multiple events per transaction to the Kafka Sink
          2) FLUME-2455: Necessary documentation changes to Flume user guide and Flume developer guide on the new Kafka sink

          Show
          jrufus Johny Rufus added a comment - - edited This patch includes a few necessary pom.xml changes and some javadoc changes on top of the patch FLUME-2251 -1.patch submitted by Thilina. A couple of issues not included in this patch, but we will track as separate Jiras : 1) FLUME-2454 : Support batchSize to allow multiple events per transaction to the Kafka Sink 2) FLUME-2455 : Necessary documentation changes to Flume user guide and Flume developer guide on the new Kafka sink
          Hide
          thilinamb Thilina Buddhika added a comment - - edited

          Hi Johny,

          I've missed attaching the patch to update the flume user guide. So I attached the corresponding patch to https://issues.apache.org/jira/browse/FLUME-2455.

          It seems like my patch is incomplete and missing some of the pom.xml files I've updated. I think you've fixed it. But I'll attach the complete patch including the documentation updates here in case if needed.

          But FLUME-2454 is still not fixed.

          Thanks for fixing it and I apologize for any inconvenience caused.

          Thanks,
          Thilina

          Show
          thilinamb Thilina Buddhika added a comment - - edited Hi Johny, I've missed attaching the patch to update the flume user guide. So I attached the corresponding patch to https://issues.apache.org/jira/browse/FLUME-2455 . It seems like my patch is incomplete and missing some of the pom.xml files I've updated. I think you've fixed it. But I'll attach the complete patch including the documentation updates here in case if needed. But FLUME-2454 is still not fixed. Thanks for fixing it and I apologize for any inconvenience caused. Thanks, Thilina
          Hide
          thilinamb Thilina Buddhika added a comment -

          Attaching the complete patch. This includes the FLUME-2455-0.patch as well.

          Show
          thilinamb Thilina Buddhika added a comment - Attaching the complete patch. This includes the FLUME-2455 -0.patch as well.
          Hide
          smolav Santiago M. Mola added a comment -

          We have antoher implementation of a Kafka 0.8 sink:

          https://github.com/Stratio/stratio-ingestion/tree/develop/stratio-sinks/stratio-kafka-sink

          Any feedback is welcome.

          Show
          smolav Santiago M. Mola added a comment - We have antoher implementation of a Kafka 0.8 sink: https://github.com/Stratio/stratio-ingestion/tree/develop/stratio-sinks/stratio-kafka-sink Any feedback is welcome .
          Hide
          gwenshap Gwen Shapira added a comment -

          I recommend committing FLUME-2251-2.patch to trunk, since we tested that version.

          Pinging Hari Shreedharan

          Show
          gwenshap Gwen Shapira added a comment - I recommend committing FLUME-2251 -2.patch to trunk, since we tested that version. Pinging Hari Shreedharan
          Hide
          hshreedharan Hari Shreedharan added a comment -

          There a couple of things which need to be fixed in the -2 patch (-3 does not apply):

          • The channel.getTransaction should be inside the try, since the channel stashes the transaction in a thread-local. So if the getTransaction throws, the next time you call it, you'd get the same transaction.
          • The first one means, there should be a null check before rollback and close calls.
          • transaction.rollback() can throw, for example - if the file channel cannot write to disk for some reason. That should be in try catch (see other sinks) - with the exception being logged.
          • The most important issue is the lack of batching in the sink. A batch size of 1 is going to give performance that is pretty much unusable. We need to fix this. This is fairly simple to do - add a config param and put the process method's code within the try in a loop (of course, one transaction per process method call, so the tx,begin etc is outside the loop).
          • Why not make the "MessagePreprocessor" configurable and pass the context in just when the sink starts up - the way HDFS and HBase sinks do it?
          Show
          hshreedharan Hari Shreedharan added a comment - There a couple of things which need to be fixed in the -2 patch (-3 does not apply): The channel.getTransaction should be inside the try, since the channel stashes the transaction in a thread-local. So if the getTransaction throws, the next time you call it, you'd get the same transaction. The first one means, there should be a null check before rollback and close calls. transaction.rollback() can throw, for example - if the file channel cannot write to disk for some reason. That should be in try catch (see other sinks) - with the exception being logged. The most important issue is the lack of batching in the sink. A batch size of 1 is going to give performance that is pretty much unusable. We need to fix this. This is fairly simple to do - add a config param and put the process method's code within the try in a loop (of course, one transaction per process method call, so the tx,begin etc is outside the loop). Why not make the "MessagePreprocessor" configurable and pass the context in just when the sink starts up - the way HDFS and HBase sinks do it?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Also, to just fit in with the convention - do you think calling the MessagePreprocessor MessageSerializer makes more sense? Instead of having 3 methods in that class why not simply have 1? Also like the HBaseSink I'd rather have it return a list of KeyedMessages than just 1 (you could break a single Event into multiple KeyedMessages). So the Serializer interface would look something like:

          public interface KafkaSerializer extends Configurable {
              List<KeyedMessage> getKeyedMessages(Event e);
          }
          
          Show
          hshreedharan Hari Shreedharan added a comment - Also, to just fit in with the convention - do you think calling the MessagePreprocessor MessageSerializer makes more sense? Instead of having 3 methods in that class why not simply have 1? Also like the HBaseSink I'd rather have it return a list of KeyedMessages than just 1 (you could break a single Event into multiple KeyedMessages). So the Serializer interface would look something like: public interface KafkaSerializer extends Configurable { List<KeyedMessage> getKeyedMessages(Event e); }
          Hide
          gwenshap Gwen Shapira added a comment -

          Hari Shreedharan - agree on most comments on working to fix them.
          However, regarding the last one, the preProcessor is most definitely a preProcessor and not a serializer.

          It actually does actions that may be a better fit for an interceptor - modifying the message, processing it to decide on a topic and partition into which the message will be published.

          Any advice on whether to remove this functionality and instead check event header for topic and partition information (so the interceptor can be used for this functionality)?

          Show
          gwenshap Gwen Shapira added a comment - Hari Shreedharan - agree on most comments on working to fix them. However, regarding the last one, the preProcessor is most definitely a preProcessor and not a serializer. It actually does actions that may be a better fit for an interceptor - modifying the message, processing it to decide on a topic and partition into which the message will be published. Any advice on whether to remove this functionality and instead check event header for topic and partition information (so the interceptor can be used for this functionality)?
          Hide
          gwenshap Gwen Shapira added a comment -

          Also, since events are byte[] and Kafka take byte[] values, removing the preProcessor (which takes strings) can save us few conversions.

          Show
          gwenshap Gwen Shapira added a comment - Also, since events are byte[] and Kafka take byte[] values, removing the preProcessor (which takes strings) can save us few conversions.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          That makes sense. We can get rid of the preprocessor - but how do you decide on the topic etc in the absence of this code? Forcing the event to have certain headers is not really the best idea. How about specify a default topic/partition in the config, which will be destination if the required headers are absent in the event?

          Show
          hshreedharan Hari Shreedharan added a comment - That makes sense. We can get rid of the preprocessor - but how do you decide on the topic etc in the absence of this code? Forcing the event to have certain headers is not really the best idea. How about specify a default topic/partition in the config, which will be destination if the required headers are absent in the event?
          Hide
          thilinamb Thilina Buddhika added a comment -

          Right now it's possible to define a default topic, but not a default partition. If a pre-processor is not configured, events will be published to this default topic with partition set to null. Then Kafka will use a random partition.

          -Thilina

          Show
          thilinamb Thilina Buddhika added a comment - Right now it's possible to define a default topic, but not a default partition. If a pre-processor is not configured, events will be published to this default topic with partition set to null. Then Kafka will use a random partition. -Thilina
          Hide
          gwenshap Gwen Shapira added a comment -

          Attaching version of the sink that fixes the try-catch issues and adds batching (but still has the pre-processor in place).

          I'm going to go ahead and remove the pre-processor and replace it with event headers (for use with interceptors), but I wanted to include this version in case Thilina Buddhika has a requirement that the interceptors don't solve.

          Show
          gwenshap Gwen Shapira added a comment - Attaching version of the sink that fixes the try-catch issues and adds batching (but still has the pre-processor in place). I'm going to go ahead and remove the pre-processor and replace it with event headers (for use with interceptors), but I wanted to include this version in case Thilina Buddhika has a requirement that the interceptors don't solve.
          Hide
          gwenshap Gwen Shapira added a comment -

          Removed the "preprocessor" code and added support for optional headers. Fixed unittests to match.

          Hari Shreedharan please review

          Show
          gwenshap Gwen Shapira added a comment - Removed the "preprocessor" code and added support for optional headers. Fixed unittests to match. Hari Shreedharan please review
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Some minor comments:

          • Please put all code in an if conditional inside {} even if it just one line.
          • There are some lines > 80 characters. Please wrap them.
          • These lines are in the hot path:
                    logger.debug("{Event} " + eventTopic + " : "+ eventKey + " : " + new String(eventBody,"UTF-8"));
            
                    logger.debug("event #{}",processedEvents);
            

            Please wrap them if (logger.isDebugEnabled())

          • Formatting is off (should be 1 less indent):
                    // publish batch and commit.
                    producer.send(messageList);
                    transaction.commit();
            
          • The messageList can be reused between process calls to avoid reallocation of the array each time. You can pre-allocate it to batchSize.
          • You probably want to check for messageList.size > 0 before calling producer.send()
          Show
          hshreedharan Hari Shreedharan added a comment - Some minor comments: Please put all code in an if conditional inside {} even if it just one line. There are some lines > 80 characters. Please wrap them. These lines are in the hot path: logger.debug( "{Event} " + eventTopic + " : " + eventKey + " : " + new String (eventBody, "UTF-8" )); logger.debug( "event #{}" ,processedEvents); Please wrap them if (logger.isDebugEnabled()) Formatting is off (should be 1 less indent): // publish batch and commit. producer.send(messageList); transaction.commit(); The messageList can be reused between process calls to avoid reallocation of the array each time. You can pre-allocate it to batchSize. You probably want to check for messageList.size > 0 before calling producer.send()
          Hide
          gwenshap Gwen Shapira added a comment -

          Fixed the nits

          Thanks for noticing the list pre-allocation bit. Also added a test for handling an empty channel.

          Show
          gwenshap Gwen Shapira added a comment - Fixed the nits Thanks for noticing the list pre-allocation bit. Also added a test for handling an empty channel.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          +1.

          I made one change (below) and a couple of formatting changes. I will commit this after running tests.

          • This code causes two lookups into the map:
                    if (headers.containsKey("topic")) {
                      eventTopic = headers.get("topic");
                    }
            

            I changed this to:

            if((eventTopic = headers.get("topic")) == null) {
              eventTopic = topic;
            }
            

          Looking at the Kafka code, it looks like KeyedMessage can have a null key - which in this case it will have if the key is not specified in the event, correct?

          Show
          hshreedharan Hari Shreedharan added a comment - +1. I made one change (below) and a couple of formatting changes. I will commit this after running tests. This code causes two lookups into the map: if (headers.containsKey( "topic" )) { eventTopic = headers.get( "topic" ); } I changed this to: if ((eventTopic = headers.get( "topic" )) == null ) { eventTopic = topic; } Looking at the Kafka code, it looks like KeyedMessage can have a null key - which in this case it will have if the key is not specified in the event, correct?
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 75f748cbd101d6efe8463a1c747fb87d2f668091 in flume's branch refs/heads/trunk from Hari Shreedharan
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=75f748c ]

          FLUME-2251. Kafka Sink.

          (Thilina Buddhika, Gwen Shapira via Hari)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 75f748cbd101d6efe8463a1c747fb87d2f668091 in flume's branch refs/heads/trunk from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=75f748c ] FLUME-2251 . Kafka Sink. (Thilina Buddhika, Gwen Shapira via Hari)
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 57d651105b4f72bcc7ea3ecd235b05bfa975c739 in flume's branch refs/heads/flume-1.6 from Hari Shreedharan
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=57d6511 ]

          FLUME-2251. Kafka Sink.

          (Thilina Buddhika, Gwen Shapira via Hari)

          Show
          jira-bot ASF subversion and git services added a comment - Commit 57d651105b4f72bcc7ea3ecd235b05bfa975c739 in flume's branch refs/heads/flume-1.6 from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=57d6511 ] FLUME-2251 . Kafka Sink. (Thilina Buddhika, Gwen Shapira via Hari)
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Committed! Thanks Thilina and Gwen!

          Show
          hshreedharan Hari Shreedharan added a comment - Committed! Thanks Thilina and Gwen!
          Hide
          hudson Hudson added a comment -

          UNSTABLE: Integrated in flume-trunk #658 (See https://builds.apache.org/job/flume-trunk/658/)
          FLUME-2251. Kafka Sink. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=75f748cbd101d6efe8463a1c747fb87d2f668091)

          • flume-ng-sinks/flume-ng-kafka-sink/pom.xml
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
          • flume-ng-dist/pom.xml
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
          • flume-ng-sinks/pom.xml
          • pom.xml
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
          Show
          hudson Hudson added a comment - UNSTABLE: Integrated in flume-trunk #658 (See https://builds.apache.org/job/flume-trunk/658/ ) FLUME-2251 . Kafka Sink. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=75f748cbd101d6efe8463a1c747fb87d2f668091 ) flume-ng-sinks/flume-ng-kafka-sink/pom.xml flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java flume-ng-dist/pom.xml flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java flume-ng-sinks/pom.xml pom.xml flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
          Hide
          hudson Hudson added a comment -

          UNSTABLE: Integrated in Flume-trunk-hbase-98 #18 (See https://builds.apache.org/job/Flume-trunk-hbase-98/18/)
          FLUME-2251. Kafka Sink. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=75f748cbd101d6efe8463a1c747fb87d2f668091)

          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
          • flume-ng-sinks/flume-ng-kafka-sink/pom.xml
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
          • flume-ng-dist/pom.xml
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties
          • flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
          • flume-ng-sinks/pom.xml
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
          • flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties
          • flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
          • pom.xml
          Show
          hudson Hudson added a comment - UNSTABLE: Integrated in Flume-trunk-hbase-98 #18 (See https://builds.apache.org/job/Flume-trunk-hbase-98/18/ ) FLUME-2251 . Kafka Sink. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=75f748cbd101d6efe8463a1c747fb87d2f668091 ) flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java flume-ng-sinks/flume-ng-kafka-sink/pom.xml flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java flume-ng-dist/pom.xml flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java flume-ng-sinks/pom.xml flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java pom.xml

            People

            • Assignee:
              gwenshap Gwen Shapira
              Reporter:
              paliwalashish Ashish Paliwal
            • Votes:
              4 Vote for this issue
              Watchers:
              16 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development