Flume
  1. Flume
  2. FLUME-2015

ElasticSearchSink: need access to IndexRequestBuilder instance during flume event processing

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: v1.3.0, v1.3.1
    • Fix Version/s: v1.4.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      I need more control over the indexing performed by the ElasticSearchSink – in particular I need access to an IndexRequestBuilder instance during flume event processing. The interactions between ElasticSearchSink and ElasticSearchEventSerializer currently do not make this possible.

      I have authored a patch that meets my needs and maintains backwards-compatibility for existing users of the sink. It is available at https://github.com/prime8/flume/commit/1056c129b10c95cee50e0c3f77e309668f82bfc6 – please let me know if it can be pulled into the main ASF source tree!

      Thanks

      1. FLUME-2015-1.4.0-0.patch
        31 kB
        Tim Bacon
      2. FLUME-2015-doc-1.4.0-0.patch
        7 kB
        Edward Sargisson
      3. FLUME-2015-1.4.0-1.patch
        34 kB
        Tim Bacon
      4. FLUME-2015-1.4.0-01.patch
        14 kB
        Tim Bacon
      5. FLUME-2015-2.patch
        22 kB
        Mike Percy

        Issue Links

          Activity

          Hide
          Tim Bacon added a comment -

          Hi Nikolaos,

          your code needs to implement the ElasticSearchIndexRequestBuilderFactory interface. The default implementation EventSerializerIndexRequestBuilderFactory extends AbstractElasticSearchIndexRequestBuilderFactory as that provides access to some convenience methods.

          HTH,
          Tim

          Show
          Tim Bacon added a comment - Hi Nikolaos, your code needs to implement the ElasticSearchIndexRequestBuilderFactory interface. The default implementation EventSerializerIndexRequestBuilderFactory extends AbstractElasticSearchIndexRequestBuilderFactory as that provides access to some convenience methods. HTH, Tim
          Hide
          Nikolaos Tsipas added a comment -

          Hello Edward, thanks for your reply. I guess that you mean to create a custom IndexRequestBuilder, right?

          Show
          Nikolaos Tsipas added a comment - Hello Edward, thanks for your reply. I guess that you mean to create a custom IndexRequestBuilder, right?
          Hide
          Edward Sargisson added a comment -

          Hi NIck,
          Create a customer IndexRequestBuilder that calculates the id you want for the document.

          Show
          Edward Sargisson added a comment - Hi NIck, Create a customer IndexRequestBuilder that calculates the id you want for the document.
          Hide
          Nikolaos Tsipas added a comment -

          Hello,

          I want to be able to set the Ids of documents saved in elasticsearch using flume elasticsearch-sink. This ticket should provide this functionality but I can't find any documentation mentioning how this can be achieved.

          Kind Regards,
          Nick

          Show
          Nikolaos Tsipas added a comment - Hello, I want to be able to set the Ids of documents saved in elasticsearch using flume elasticsearch-sink. This ticket should provide this functionality but I can't find any documentation mentioning how this can be achieved. Kind Regards, Nick
          Hide
          Hudson added a comment -

          Integrated in flume-trunk #408 (See https://builds.apache.org/job/flume-trunk/408/)
          FLUME-2015. ElasticSearchSink: need access to IndexRequestBuilder instance during flume event processing (Revision 6b910371268685122c2d9fbff8e7ed7f46acbcbb)

          Result = SUCCESS
          mpercy : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=6b910371268685122c2d9fbff8e7ed7f46acbcbb
          Files :

          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java
          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
          • flume-ng-doc/sphinx/FlumeUserGuide.rst
          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java
          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
          Show
          Hudson added a comment - Integrated in flume-trunk #408 (See https://builds.apache.org/job/flume-trunk/408/ ) FLUME-2015 . ElasticSearchSink: need access to IndexRequestBuilder instance during flume event processing (Revision 6b910371268685122c2d9fbff8e7ed7f46acbcbb) Result = SUCCESS mpercy : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=6b910371268685122c2d9fbff8e7ed7f46acbcbb Files : flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java flume-ng-doc/sphinx/FlumeUserGuide.rst flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
          Hide
          Mike Percy added a comment -

          By the way, feel free to file follow-up JIRAs on any tweaks you feel should be made to this, like warnings or whatever

          Show
          Mike Percy added a comment - By the way, feel free to file follow-up JIRAs on any tweaks you feel should be made to this, like warnings or whatever
          Hide
          Mike Percy added a comment -

          Edward, great to hear

          Show
          Mike Percy added a comment - Edward, great to hear
          Hide
          Mike Percy added a comment -

          Pushed to trunk and flume-1.4 branches.

          Thanks for the code patch Tim, and thanks for the review and doc patch Edward!

          By the way, you have both been added as Flume contributors, so you can assign yourselves JIRAs now.

          Show
          Mike Percy added a comment - Pushed to trunk and flume-1.4 branches. Thanks for the code patch Tim, and thanks for the review and doc patch Edward! By the way, you have both been added as Flume contributors, so you can assign yourselves JIRAs now.
          Hide
          Edward Sargisson added a comment -

          Hi Mike!
          Yay! I was just about to report that I've run Tim's patch (FLUME-2015-1.4.0-1.patch) through my own set of functional tests and everything passed first time.

          Show
          Edward Sargisson added a comment - Hi Mike! Yay! I was just about to report that I've run Tim's patch ( FLUME-2015 -1.4.0-1.patch) through my own set of functional tests and everything passed first time.
          Hide
          Mike Percy added a comment -

          +1

          Thanks guys for working together on this! I am going to commit the combined patch which includes the docs. Attaching the full patch I am committing.

          Show
          Mike Percy added a comment - +1 Thanks guys for working together on this! I am going to commit the combined patch which includes the docs. Attaching the full patch I am committing.
          Hide
          Tim Bacon added a comment -

          I've updated the RB with the cumulative patch. Thanks for taking the time for the review!

          Show
          Tim Bacon added a comment - I've updated the RB with the cumulative patch. Thanks for taking the time for the review!
          Hide
          Mike Percy added a comment -

          Oops, thx for the clarification Tim! My mistake. Yes cumulative is best. I will try to review this again tomorrow.

          Show
          Mike Percy added a comment - Oops, thx for the clarification Tim! My mistake. Yes cumulative is best. I will try to review this again tomorrow.
          Hide
          Tim Bacon added a comment - - edited

          Mike, thanks for the review.

          I don't think there is a need for a separate interface because (as I said on the RB) the new return type is the interface from which the old return type is subclassed. Hope you agree?!

          There is one further change that I would like to make however: the new interface does not currently extend Configurable / ConfigurableComponent. I have attached a standalone patch https://issues.apache.org/jira/secure/attachment/12582445/FLUME-2015-1.4.0-01.patch that contains this new change, and a cumulative patch https://issues.apache.org/jira/secure/attachment/12582444/FLUME-2015-1.4.0-1.patch that contains both the additional and prior changes.

          Please let me know if you would like this additional patch to go through the RB as well.

          Show
          Tim Bacon added a comment - - edited Mike, thanks for the review. I don't think there is a need for a separate interface because (as I said on the RB) the new return type is the interface from which the old return type is subclassed. Hope you agree?! There is one further change that I would like to make however: the new interface does not currently extend Configurable / ConfigurableComponent . I have attached a standalone patch https://issues.apache.org/jira/secure/attachment/12582445/FLUME-2015-1.4.0-01.patch that contains this new change, and a cumulative patch https://issues.apache.org/jira/secure/attachment/12582444/FLUME-2015-1.4.0-1.patch that contains both the additional and prior changes. Please let me know if you would like this additional patch to go through the RB as well.
          Hide
          Tim Bacon added a comment -

          Cumulative patch

          Show
          Tim Bacon added a comment - Cumulative patch
          Hide
          Tim Bacon added a comment -

          Additional patch: ElasticSearchIndexRequestBuilderFactory should be Configurable

          Show
          Tim Bacon added a comment - Additional patch: ElasticSearchIndexRequestBuilderFactory should be Configurable
          Hide
          Mike Percy added a comment -

          Oops I mean like Edward did in FLUME-1782

          Show
          Mike Percy added a comment - Oops I mean like Edward did in FLUME-1782
          Hide
          Mike Percy added a comment -

          Looks good Tim, left feedback on the RB indicating we should make a separate interface for the serializer like Edward did in FLUME-1972

          Thanks
          Mike

          Show
          Mike Percy added a comment - Looks good Tim, left feedback on the RB indicating we should make a separate interface for the serializer like Edward did in FLUME-1972 Thanks Mike
          Hide
          Tim Bacon added a comment - - edited

          Thanks for the positive feedback! I've attached a link to the Code Review Board at the top of the page.

          In reply to your comments:

          • My (non-logging-related) use case has no need for the timestamp to be provided as a header on every event, which is why I didn't add the warnings. If this makes the patch unsuitable for your LogStash use case I can always add it back in such a way as to avoid the warnings in my own code path...
          • There were 2 reasons why I wrapped the incoming event rather than adding the timestamp as another param
            • I was striving to keep backwards-compatibility (speaking as someone who has been burned by the lack of it in the past!) and the contract with existing clients is based only on the event param
            • The timestamp used to build the index name should match the timestamp value that goes into the index, and it's only at the level above ElasticSearchLogStashEventSerializer that consistency can be ensured
          • Having said that I understand your concern about GC – and, if it helps, I haven't noticed any performance hit while running the new code in my own flume test environment (with several million events per day)

          Rgds,
          Tim

          Show
          Tim Bacon added a comment - - edited Thanks for the positive feedback! I've attached a link to the Code Review Board at the top of the page. In reply to your comments: My (non-logging-related) use case has no need for the timestamp to be provided as a header on every event, which is why I didn't add the warnings. If this makes the patch unsuitable for your LogStash use case I can always add it back in such a way as to avoid the warnings in my own code path... There were 2 reasons why I wrapped the incoming event rather than adding the timestamp as another param I was striving to keep backwards-compatibility (speaking as someone who has been burned by the lack of it in the past!) and the contract with existing clients is based only on the event param The timestamp used to build the index name should match the timestamp value that goes into the index, and it's only at the level above ElasticSearchLogStashEventSerializer that consistency can be ensured Having said that I understand your concern about GC – and, if it helps, I haven't noticed any performance hit while running the new code in my own flume test environment (with several million events per day) Rgds, Tim
          Hide
          Edward Sargisson added a comment -

          Hi Tim,
          I've just reviewed your new patch and it's nice work. I think you should proceed with the next step of setting up a review on the Review Board: https://reviews.apache.org. I believe Israel provided you with details on how to do this.

          I've just written what I think the docs should say (given all the other changes I think should be made to the docs) and include it here as FLUME-2015-doc-1.4.0-0.patch. This is a separate patch to the code patch you provided.

          In our local environment I have built a version of 1.4.0 with your patch, modified our extensions to work with it and run it through our continuous integration tests successfully.

          Comments:

          • I note that you didn't include the code to warn periodically when the timestamp is not provided. In my view, I think the operators should be warned of this because it affects the timestamp the events are written with - and thus how they can be found.
          • I note that you copy the entire event in order to wrap it into a TimestampEvent. I was initially concerned because this would lead to a lot of garbage - leading to large GC pauses. However, I've convinced myself that the new objects are too short lived for this to matter. I would have preferred keeping the original event and passing the timestamp along beside it.
          Show
          Edward Sargisson added a comment - Hi Tim, I've just reviewed your new patch and it's nice work. I think you should proceed with the next step of setting up a review on the Review Board: https://reviews.apache.org . I believe Israel provided you with details on how to do this. I've just written what I think the docs should say (given all the other changes I think should be made to the docs) and include it here as FLUME-2015 -doc-1.4.0-0.patch. This is a separate patch to the code patch you provided. In our local environment I have built a version of 1.4.0 with your patch, modified our extensions to work with it and run it through our continuous integration tests successfully. Comments: I note that you didn't include the code to warn periodically when the timestamp is not provided. In my view, I think the operators should be warned of this because it affects the timestamp the events are written with - and thus how they can be found. I note that you copy the entire event in order to wrap it into a TimestampEvent. I was initially concerned because this would lead to a lot of garbage - leading to large GC pauses. However, I've convinced myself that the new objects are too short lived for this to matter. I would have preferred keeping the original event and passing the timestamp along beside it.
          Hide
          Tim Bacon added a comment -

          I've attached a full patch incorporating:

          • my original code plus the interface rename from Buider-er to Factory
          • the (UTC) determination of the correct index to write to per FLUME-1782
          • extensible abstract and concrete implementations of the interface that I believe will satisfy FLUME-1972 as well as my own needs

          It does NOT incorporate the user guide changes – I'd like some guidance on what exactly to add (if anything) given that this patch is backwards-compatible.

          Looking forward to getting some feedback

          Show
          Tim Bacon added a comment - I've attached a full patch incorporating: my original code plus the interface rename from Buider-er to Factory the (UTC) determination of the correct index to write to per FLUME-1782 extensible abstract and concrete implementations of the interface that I believe will satisfy FLUME-1972 as well as my own needs It does NOT incorporate the user guide changes – I'd like some guidance on what exactly to add (if anything) given that this patch is backwards-compatible. Looking forward to getting some feedback
          Hide
          Tim Bacon added a comment -

          Mike: thanks for the pointer to Edward's work and the info on how to properly attach patches.

          My motivating use case is that

          • my flume events have no structured headers and are not logging-related
          • there is a structured avro blob in the event bytes and I want to use avro to turn those bytes into the JSON that will be stored in ElasticSearch (i.e. bypass XContentBuilder entirely)
          • I need to be able to set the ElasticSearch ID, index name and possibly index type from the actual event content (similar to FLUME-1972)

          Edward: thanks for the info about the work you have already done and the comments on my patch.

          I'd be delighted to merge the existing patch https://issues.apache.org/jira/secure/attachment/12579897/FLUME-1782-FLUME-1972-1.4.0-1.patch with the work I've done – and rename the Builder-er to Factory – then attach the result to this issue for review. For starters I'll add the timestamp as a method param along with the index name/type. I'll probably need 24 hours or so for turnaround.

          Let me know if you would like me to also take a stab at the user guide changes.

          Thanks!
          Tim

          Show
          Tim Bacon added a comment - Mike: thanks for the pointer to Edward's work and the info on how to properly attach patches. My motivating use case is that my flume events have no structured headers and are not logging-related there is a structured avro blob in the event bytes and I want to use avro to turn those bytes into the JSON that will be stored in ElasticSearch (i.e. bypass XContentBuilder entirely) I need to be able to set the ElasticSearch ID, index name and possibly index type from the actual event content (similar to FLUME-1972 ) Edward: thanks for the info about the work you have already done and the comments on my patch. I'd be delighted to merge the existing patch https://issues.apache.org/jira/secure/attachment/12579897/FLUME-1782-FLUME-1972-1.4.0-1.patch with the work I've done – and rename the Builder-er to Factory – then attach the result to this issue for review. For starters I'll add the timestamp as a method param along with the index name/type. I'll probably need 24 hours or so for turnaround. Let me know if you would like me to also take a stab at the user guide changes. Thanks! Tim
          Hide
          Edward Sargisson added a comment -

          I've been thinking about this overnight - this is a great patch:

          • I think FLUME-1972 should be abandoned in favour of this patch - this patch is a much more generic and usable way to achieve the same goal. I also think the ElasticSearchEventSerializer2 interface from FLUME-1782 should be abandoned in favour of this patch.
          • I still think its name should be ElasticSearchIndexRequestBuilderFactory.
          • We should consider what we want to do with the possibly generated timestamp and the index name. We could pass both of those to the ElasticSearchIndexRequestBuilderFactory - or we could move the timestamp and index name generation to a default and/or abstract ElasticSearchIndexRequestBuilderFactory.
          • We will need to update the Flume User Guide.

          I'm not sure I can justify using my employer's time on this - at least not for a few days.

          Show
          Edward Sargisson added a comment - I've been thinking about this overnight - this is a great patch: I think FLUME-1972 should be abandoned in favour of this patch - this patch is a much more generic and usable way to achieve the same goal. I also think the ElasticSearchEventSerializer2 interface from FLUME-1782 should be abandoned in favour of this patch. I still think its name should be ElasticSearchIndexRequestBuilderFactory. We should consider what we want to do with the possibly generated timestamp and the index name. We could pass both of those to the ElasticSearchIndexRequestBuilderFactory - or we could move the timestamp and index name generation to a default and/or abstract ElasticSearchIndexRequestBuilderFactory. We will need to update the Flume User Guide. I'm not sure I can justify using my employer's time on this - at least not for a few days.
          Hide
          Edward Sargisson added a comment -

          Hi Tim,
          Great! Being able to control the indexing in more detail is something that had occurred to me too so thank you for putting this together.

          As I see it, we have a few issues to resolve to bring this in.
          Firstly, the basic defect in FLUME-1782 isn't fixed so the incorrect index name is calculated in many conditions.
          Secondly, in order to fix that defect in a way that doesn't break existing implementations I've created an ElasticSearchEventSerializer2 interface which takes the timestamp the sink decided to provide - if it didn't find one in the event (I only put that patch up about an hour ago).
          Thirdly, FLUME-1972 contains the patch to provide the ID to elasticsearch if a user wishes to (instead of relying on elasticsearch generating it).
          Fourthly, I don't see any update to the Flume User Guide.

          So, how about we do this:

          • if you could possibly modify your patch to uses ElasticSearchEventSerializer2 and ElasticSearchIdProvider then that would be really great.
          • hopefully the Flume committers can either get FLUME 1782 and FLUME 1972 reviewed and committed - or wait on your work.

          Nit: Builderer isn't English... How about ElasticSearchIndexRequestBuilderFactory?

          Show
          Edward Sargisson added a comment - Hi Tim, Great! Being able to control the indexing in more detail is something that had occurred to me too so thank you for putting this together. As I see it, we have a few issues to resolve to bring this in. Firstly, the basic defect in FLUME-1782 isn't fixed so the incorrect index name is calculated in many conditions. Secondly, in order to fix that defect in a way that doesn't break existing implementations I've created an ElasticSearchEventSerializer2 interface which takes the timestamp the sink decided to provide - if it didn't find one in the event (I only put that patch up about an hour ago). Thirdly, FLUME-1972 contains the patch to provide the ID to elasticsearch if a user wishes to (instead of relying on elasticsearch generating it). Fourthly, I don't see any update to the Flume User Guide. So, how about we do this: if you could possibly modify your patch to uses ElasticSearchEventSerializer2 and ElasticSearchIdProvider then that would be really great. hopefully the Flume committers can either get FLUME 1782 and FLUME 1972 reviewed and committed - or wait on your work. Nit: Builderer isn't English... How about ElasticSearchIndexRequestBuilderFactory?
          Hide
          Mike Percy added a comment -

          Hi Tim,
          Thanks very much for the patch! I think you will be happy to know that there is current work ongoing on a new serializer interface, as the existing API cannot be modified due to a desire to maintain backwards API compatibility for Flume plugin writers.

          It would be really great if you could coordinate with Edward Sargisson since he has a patch up to implement a new serializer in FLUME-1782, and it would be unfortunate if that new interface would not be able to handle your use case.

          A couple of additional things I'd also ask of you:

          Thanks!

          Regards,
          Mike

          Show
          Mike Percy added a comment - Hi Tim, Thanks very much for the patch! I think you will be happy to know that there is current work ongoing on a new serializer interface, as the existing API cannot be modified due to a desire to maintain backwards API compatibility for Flume plugin writers. It would be really great if you could coordinate with Edward Sargisson since he has a patch up to implement a new serializer in FLUME-1782 , and it would be unfortunate if that new interface would not be able to handle your use case. A couple of additional things I'd also ask of you: Can you provide a bit more information about what your motivating use case is for this API enhancement? Please check out the Providing Patches section of our How to Contribute page to find out how to attach a patch to a JIRA ticket. Thanks! Regards, Mike

            People

            • Assignee:
              Tim Bacon
              Reporter:
              Tim Bacon
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development