Flume
  1. Flume
  2. FLUME-2126

Problem in elasticsearch sink when the event body is a complex field

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: v1.6.0
    • Component/s: Sinks+Sources
    • Labels:
      None
    • Environment:

      1.3.1 and 1.4

      Description

      I have found a bug in the elasticsearch sink, the problem is in the ContentBuilderUtil.addComplexField method, when it does builder.field(fieldName, tmp); the tmp object is taken as Object with the result of being serialized with the toString method in the XContentBuilder. In the end you get the object reference as content.

      The following change workaround the problem for me, the bad point is that it has to parse the content twice, I guess there is a better way to solve the problem but I am not an elasticsearch api expert.

      --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
      +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
      @@ -61,7 +61,12 @@ public class ContentBuilderUtil {
             parser = XContentFactory.xContent(contentType).createParser(data);
             parser.nextToken();
             tmp.copyCurrentStructure(parser);
      -      builder.field(fieldName, tmp);
      +
      +      // if it is a valid structure then we include it
      +      parser = XContentFactory.xContent(contentType).createParser(data);
      +      parser.nextToken();
      +      builder.field(fieldName);
      +      builder.copyCurrentStructure(parser);
           } catch (JsonParseException ex) {
             // If we get an exception here the most likely cause is nested JSON that
             // can't be figured out in the body. At this point just push it through
      
      1. FLUME-2126-0.patch
        2 kB
        Ashish Paliwal

        Issue Links

          Activity

          Hide
          Deepak Subhramanian added a comment -

          I am having the same problem while posting json data. Thinking about using the temporary fix suggested. Any plans for fixing it in future versions.

          Show
          Deepak Subhramanian added a comment - I am having the same problem while posting json data. Thinking about using the temporary fix suggested. Any plans for fixing it in future versions.
          Hide
          Ashish Paliwal added a comment -

          We are currently in code freeze waiting for 1.5 release. Shall submit a patch as soon as 1.5 is released

          Show
          Ashish Paliwal added a comment - We are currently in code freeze waiting for 1.5 release. Shall submit a patch as soon as 1.5 is released
          Hide
          Deepak Subhramanian added a comment -

          Thanks Ashish. I also noticed that when a batch fails it repost the data continously instead of timing out after certain time.

          Show
          Deepak Subhramanian added a comment - Thanks Ashish. I also noticed that when a batch fails it repost the data continously instead of timing out after certain time.
          Hide
          Edward Sargisson added a comment -

          We need to be careful in testing in this section. The exception catches in this section are designed to do much the same job.

          There have been at least two previous work items where we've had to enhance what is accepted.

          Show
          Edward Sargisson added a comment - We need to be careful in testing in this section. The exception catches in this section are designed to do much the same job. There have been at least two previous work items where we've had to enhance what is accepted.
          Hide
          Miguel Santos added a comment -

          to confirm, the fix will not be in 1.5?

          Show
          Miguel Santos added a comment - to confirm, the fix will not be in 1.5?
          Hide
          Hari Shreedharan added a comment -

          Unfortunately, no. Flume 1.5 release is complete. I was only waiting for maven central to sync up - which it has. I will be sending out the announcement today.

          Show
          Hari Shreedharan added a comment - Unfortunately, no. Flume 1.5 release is complete. I was only waiting for maven central to sync up - which it has. I will be sending out the announcement today.
          Hide
          Ashish Paliwal added a comment -

          Edward Sargisson Shall I submit the patch as discussed on ML or it's not inline with your thought process?

          Show
          Ashish Paliwal added a comment - Edward Sargisson Shall I submit the patch as discussed on ML or it's not inline with your thought process?
          Hide
          Deepak Subhramanian added a comment -

          I also created another issue related to ESSInk . The ESSink send messages many times if one of the event fails validation at ES because of JSON mapping.
          https://issues.apache.org/jira/browse/FLUME-2390

          Show
          Deepak Subhramanian added a comment - I also created another issue related to ESSInk . The ESSink send messages many times if one of the event fails validation at ES because of JSON mapping. https://issues.apache.org/jira/browse/FLUME-2390
          Hide
          Edward Sargisson added a comment -

          Ashish Paliwal By all means submit a patch. Patches are greatly encouraged!

          I'm raising the concern that this particular section of code has proven a bit problematic in making sure everybody's data gets through to elasticsearch - and we've had a number of different data formats that have failed at this point so we should be thoughtful about it.

          Show
          Edward Sargisson added a comment - Ashish Paliwal By all means submit a patch. Patches are greatly encouraged! I'm raising the concern that this particular section of code has proven a bit problematic in making sure everybody's data gets through to elasticsearch - and we've had a number of different data formats that have failed at this point so we should be thoughtful about it.
          Hide
          Andrew Sammut added a comment -


          I'm interested in this functionality making it into an upcoming release, however, as stated earlier it must contain some form of exception handling. The incoming field type must match the existing field, and if not drop the record (or alter it on the fly). I've seen this fail first hand and have resolved it by either changing the Elasticsearch mapping or using morphline to adjust the field type. Neither of those situations is ideal.

          Show
          Andrew Sammut added a comment - I'm interested in this functionality making it into an upcoming release, however, as stated earlier it must contain some form of exception handling. The incoming field type must match the existing field, and if not drop the record (or alter it on the fly). I've seen this fail first hand and have resolved it by either changing the Elasticsearch mapping or using morphline to adjust the field type. Neither of those situations is ideal.
          Hide
          Hari Shreedharan added a comment -

          Ashish Paliwal - Do you have a patch for this one?

          Show
          Hari Shreedharan added a comment - Ashish Paliwal - Do you have a patch for this one?
          Hide
          Ashish Paliwal added a comment -

          Hari Shreedharan Let me check once am back from vacation on monday. Should have it somewhere.

          Show
          Ashish Paliwal added a comment - Hari Shreedharan Let me check once am back from vacation on monday. Should have it somewhere.
          Hide
          Ashish Paliwal added a comment -

          Hari Shreedharan Added the patch and review request

          Show
          Ashish Paliwal added a comment - Hari Shreedharan Added the patch and review request
          Hide
          Edward Sargisson added a comment -

          Ashish Paliwal Thanks for the patch. I will review when I have a minute.

          Show
          Edward Sargisson added a comment - Ashish Paliwal Thanks for the patch. I will review when I have a minute.
          Hide
          Ashish Paliwal added a comment -

          Edward Sargisson Thanks ! It's a single line change as discussed on ML. This portion might need further changes to support additional content types. I added the fix based on current implementation.

          Show
          Ashish Paliwal added a comment - Edward Sargisson Thanks ! It's a single line change as discussed on ML. This portion might need further changes to support additional content types. I added the fix based on current implementation.
          Hide
          Edward Sargisson added a comment -

          I'm happy with this patch and recommend it be merged.

          I reviewed Jira for related work items to see if they should be included.

          Once in we should review FLUME-2476 as I believe it's a duplicate.
          I would also like to get the failing data from FLUME-2390 and write a test for that - however, I think that is a separate defect.

          Show
          Edward Sargisson added a comment - I'm happy with this patch and recommend it be merged. I reviewed Jira for related work items to see if they should be included. Once in we should review FLUME-2476 as I believe it's a duplicate. I would also like to get the failing data from FLUME-2390 and write a test for that - however, I think that is a separate defect.
          Hide
          Hari Shreedharan added a comment -

          +1. Committing this

          Show
          Hari Shreedharan added a comment - +1. Committing this
          Hide
          ASF subversion and git services added a comment -

          Commit 8328bccd41077d457cab064541127fc993e97619 in flume's branch refs/heads/trunk from Hari Shreedharan
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=8328bcc ]

          FLUME-2126. Problem in elasticsearch sink when the event body is a complex field

          (Ashish Paliwal via Hari)

          Show
          ASF subversion and git services added a comment - Commit 8328bccd41077d457cab064541127fc993e97619 in flume's branch refs/heads/trunk from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=8328bcc ] FLUME-2126 . Problem in elasticsearch sink when the event body is a complex field (Ashish Paliwal via Hari)
          Hide
          ASF subversion and git services added a comment -

          Commit 5093d98189a15e82b223597eb24491cbcb7340db in flume's branch refs/heads/flume-1.6 from Hari Shreedharan
          [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=5093d98 ]

          FLUME-2126. Problem in elasticsearch sink when the event body is a complex field

          (Ashish Paliwal via Hari)

          Show
          ASF subversion and git services added a comment - Commit 5093d98189a15e82b223597eb24491cbcb7340db in flume's branch refs/heads/flume-1.6 from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=5093d98 ] FLUME-2126 . Problem in elasticsearch sink when the event body is a complex field (Ashish Paliwal via Hari)
          Hide
          Hari Shreedharan added a comment -

          Committed! Thanks Ashish Paliwal for the patch and Edward Sargisson for the review!

          Show
          Hari Shreedharan added a comment - Committed! Thanks Ashish Paliwal for the patch and Edward Sargisson for the review!
          Hide
          Hudson added a comment -

          UNSTABLE: Integrated in Flume-trunk-hbase-98 #34 (See https://builds.apache.org/job/Flume-trunk-hbase-98/34/)
          FLUME-2126. Problem in elasticsearch sink when the event body is a complex field (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=8328bccd41077d457cab064541127fc993e97619)

          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
          Show
          Hudson added a comment - UNSTABLE: Integrated in Flume-trunk-hbase-98 #34 (See https://builds.apache.org/job/Flume-trunk-hbase-98/34/ ) FLUME-2126 . Problem in elasticsearch sink when the event body is a complex field (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=8328bccd41077d457cab064541127fc993e97619 ) flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
          Hide
          Hudson added a comment -

          FAILURE: Integrated in flume-trunk #676 (See https://builds.apache.org/job/flume-trunk/676/)
          FLUME-2126. Problem in elasticsearch sink when the event body is a complex field (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=8328bccd41077d457cab064541127fc993e97619)

          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
          • flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
          Show
          Hudson added a comment - FAILURE: Integrated in flume-trunk #676 (See https://builds.apache.org/job/flume-trunk/676/ ) FLUME-2126 . Problem in elasticsearch sink when the event body is a complex field (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=8328bccd41077d457cab064541127fc993e97619 ) flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
          Hide
          Francis added a comment -

          I'm not sure to understand the meaning of this comment. Is it fixed or not? The bug is marked as fixed, but if the integration failed, it means it's not fixed, right?

          Show
          Francis added a comment - I'm not sure to understand the meaning of this comment. Is it fixed or not? The bug is marked as fixed, but if the integration failed, it means it's not fixed, right?
          Show
          Edward Sargisson added a comment - Francis It's there. Link to the commit below. The Flume build is often quite unstable and fails for all kinds of reasons. https://git-wip-us.apache.org/repos/asf?p=flume.git;a=blobdiff;f=flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java;h=70d0b8689a462d13fd80d73d9bce314b9f620e08;hp=bf7c57c7d705acc544f13f191be6ade4eb6dbb80;hb=5093d98;hpb=ebde0dac57e0b3cd21319cfdddd4d482d0a9fffd
          Hide
          Hari Shreedharan added a comment -

          I have looked at the build issues, there is not a whole lot we can do. Most of it is due to the shared infra our builds run on, though sometimes some of our tests have hard-codes ports (sometimes they are out of our control like mini clusters). Build locally and verify, that is what most committers do.

          Show
          Hari Shreedharan added a comment - I have looked at the build issues, there is not a whole lot we can do. Most of it is due to the shared infra our builds run on, though sometimes some of our tests have hard-codes ports (sometimes they are out of our control like mini clusters). Build locally and verify, that is what most committers do.
          Hide
          Francis added a comment -

          I don't understand how this will fix the bug. By calling tmp.string(), the field will be added as a string. The JsonXContentGenerator will then call jackson.JsonGenerator.writeString and the string will become invalid Json, as it will be escaped. The jackson.JsonGenerator.writeString documentation is clear:

          "Method for outputting a String value. Depending on context this means either array element, (object) field value or a stand alone String; but in all cases, String will be surrounded in double quotes, and contents will be properly escaped as required by Json specification."

          I tried the patch and this is exactly what I get. For example, when the body of an event is

          {"foo":"bar"}

          , the resulting document in ES will contain "

          {\"foo\":\"bar\"}

          ". ES view the field as plain text and not Json.

          I really don't understand what the elasticsearch sink is trying to do. If it detects that the field is Json, it will parse it to make sure it's valid Json, but it will then be added as plain text. That's almost the same as if all fields were added by using the addSimpleField method, minus the Json validation! The original code would have been fine if the ES Java API documentation was right. They say: "By the way, the field method accepts many object types. You can directly pass numbers, dates and even other XContentBuilder objects". But looking at the source code, this is clearly wrong, there's no field method accepting an XContentBuilder as value. To get around this issue, I think the sink should call rawField when detecting a field as Json. This will ensure that the string won't be escaped and will be treated as a Json field by ES.

          Does it make sense or I'm missing something here?

          Show
          Francis added a comment - I don't understand how this will fix the bug. By calling tmp.string(), the field will be added as a string. The JsonXContentGenerator will then call jackson.JsonGenerator.writeString and the string will become invalid Json, as it will be escaped. The jackson.JsonGenerator.writeString documentation is clear: "Method for outputting a String value. Depending on context this means either array element, (object) field value or a stand alone String; but in all cases, String will be surrounded in double quotes, and contents will be properly escaped as required by Json specification." I tried the patch and this is exactly what I get. For example, when the body of an event is {"foo":"bar"} , the resulting document in ES will contain " {\"foo\":\"bar\"} ". ES view the field as plain text and not Json. I really don't understand what the elasticsearch sink is trying to do. If it detects that the field is Json, it will parse it to make sure it's valid Json, but it will then be added as plain text. That's almost the same as if all fields were added by using the addSimpleField method, minus the Json validation! The original code would have been fine if the ES Java API documentation was right. They say: "By the way, the field method accepts many object types. You can directly pass numbers, dates and even other XContentBuilder objects". But looking at the source code, this is clearly wrong, there's no field method accepting an XContentBuilder as value. To get around this issue, I think the sink should call rawField when detecting a field as Json. This will ensure that the string won't be escaped and will be treated as a Json field by ES. Does it make sense or I'm missing something here?
          Hide
          Xuri Nagarin added a comment -

          Would like to add - I have been playing with the ElasticSearch sink and ElasticSearchLogStashEventSerializer. In my case, pulling Json from Kafka, the flume event header consists of (topic, key, sub-key) and event body is where the message goes. The Logstash serializer takes event body and sticks it in the "message" field of the document sent to ES so my entire Json gets sent as a string to ES record.

          Show
          Xuri Nagarin added a comment - Would like to add - I have been playing with the ElasticSearch sink and ElasticSearchLogStashEventSerializer. In my case, pulling Json from Kafka, the flume event header consists of (topic, key, sub-key) and event body is where the message goes. The Logstash serializer takes event body and sticks it in the "message" field of the document sent to ES so my entire Json gets sent as a string to ES record.
          Hide
          Francis added a comment -

          It is possible to reopen this issue?

          Show
          Francis added a comment - It is possible to reopen this issue?
          Hide
          Hari Shreedharan added a comment -

          Edward Sargisson - Do you think this issue needs to be reopened and a new fix put in? I am quite unfamiliar with this code, so I leave it to you.

          Show
          Hari Shreedharan added a comment - Edward Sargisson - Do you think this issue needs to be reopened and a new fix put in? I am quite unfamiliar with this code, so I leave it to you.
          Hide
          Edward Sargisson added a comment -

          Hari Shreedharan yes, we probably should.

          This particular section of code has always been particularly problematic. A variety of different formats come through it and users expect them all to be written to ES safely. Sometimes, the formats break this code, or ES ingestion, and then the queue gets blocked.

          I'm not convinced that we have a body of test data that covers what everybody wants. Perhaps those people that are interested can post a copy of a Flume event that breaks ES and how they expect to see it in ES?

          Show
          Edward Sargisson added a comment - Hari Shreedharan yes, we probably should. This particular section of code has always been particularly problematic. A variety of different formats come through it and users expect them all to be written to ES safely. Sometimes, the formats break this code, or ES ingestion, and then the queue gets blocked. I'm not convinced that we have a body of test data that covers what everybody wants. Perhaps those people that are interested can post a copy of a Flume event that breaks ES and how they expect to see it in ES?
          Hide
          Hari Shreedharan added a comment -

          Reopening based on above discussion.

          Show
          Hari Shreedharan added a comment - Reopening based on above discussion.
          Hide
          Xuri Nagarin added a comment -

          Edward Sargisson How would you like the Flume event to be captured/posted? In my case, I have Json coming in via the Kafka source. Right now, I am using morphlines to extract the kafka message and populate all the key/value pairs in the flume event header so the ES sink can properly populate ES index. Would be much nicer if the ES sink or LogStash serializer could (1) detect message type (2) if message/flume-body is structured then populate the ES index with key/value pairs from the flume header and body content. If flume event body isn't recognized as structured message then shove it in the "message" field (current behaviour).

          I believe the relevant code is:
          builder.startObject("@fields");
          for (String key : headers.keySet())

          { byte[] val = headers.get(key).getBytes(charset); ContentBuilderUtil.appendField(builder, key, val); }

          builder.endObject();
          }

          https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java

          Add iterating over the body if the body is structured?

          Show
          Xuri Nagarin added a comment - Edward Sargisson How would you like the Flume event to be captured/posted? In my case, I have Json coming in via the Kafka source. Right now, I am using morphlines to extract the kafka message and populate all the key/value pairs in the flume event header so the ES sink can properly populate ES index. Would be much nicer if the ES sink or LogStash serializer could (1) detect message type (2) if message/flume-body is structured then populate the ES index with key/value pairs from the flume header and body content. If flume event body isn't recognized as structured message then shove it in the "message" field (current behaviour). I believe the relevant code is: builder.startObject("@fields"); for (String key : headers.keySet()) { byte[] val = headers.get(key).getBytes(charset); ContentBuilderUtil.appendField(builder, key, val); } builder.endObject(); } https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java Add iterating over the body if the body is structured?
          Hide
          Edward Sargisson added a comment -

          Hi [~xnag[
          Re: Format.
          Eventually, all of the different styles will need to go into a unit test so why don't we think about that as a format. Something like TestElasticSearchLogStashEventSerializer.
          Even if you, and others, can't be bothered putting things into a Java format then we can probably deal with that. It would, however, run the risk of not being faithful to the actual problem and makes for more work.

          I think we need to think through how much auto-sensing we might want to do. It's risky and probably slow.

          That said, I want to collect the full gamut of data to see what we can do.

          Show
          Edward Sargisson added a comment - Hi [~xnag[ Re: Format. Eventually, all of the different styles will need to go into a unit test so why don't we think about that as a format. Something like TestElasticSearchLogStashEventSerializer . Even if you, and others, can't be bothered putting things into a Java format then we can probably deal with that. It would, however, run the risk of not being faithful to the actual problem and makes for more work. I think we need to think through how much auto-sensing we might want to do. It's risky and probably slow. That said, I want to collect the full gamut of data to see what we can do.
          Hide
          Xuri Nagarin added a comment -

          Edward Sargisson Come to think of it, I'd rather leave the ES sink simple and reliable (and input interface well defined) so people can do their data massaging/formatting in some sort of interceptor that fits their ingest schemes. Doing the massaging in the sink isn't going to save much cycles since the massaging has to be done some place but doing it in an interceptor will keep the sink clean, I believe. Maybe simply document the sink/serializer behaviour better with fair warning that if the user does not deploy an interceptor to clean up the data then the sink could blow up in case the data isn't legible to ElasticSearch?

          Show
          Xuri Nagarin added a comment - Edward Sargisson Come to think of it, I'd rather leave the ES sink simple and reliable (and input interface well defined) so people can do their data massaging/formatting in some sort of interceptor that fits their ingest schemes. Doing the massaging in the sink isn't going to save much cycles since the massaging has to be done some place but doing it in an interceptor will keep the sink clean, I believe. Maybe simply document the sink/serializer behaviour better with fair warning that if the user does not deploy an interceptor to clean up the data then the sink could blow up in case the data isn't legible to ElasticSearch?
          Hide
          Francis added a comment -

          I think that even before thinking about how this could be improved, the sink should be fixed to make sure json data is not escaped. As I said before, with the patch, if an event contains a json header or body, it will be escaped and ES will treat it like a string.

          For example, with a body like this:

          {"foo":"bar"}

          the resulting document in ES will be
          {"@message": "{\"foo\":\"bar\"}}"

          To fix this, I modified the ContentBuilderUtil.addComplexField() method. I changed this:
          builder.field(fieldName, tmp.string());
          with this:
          builder.rawField(fieldName, data);

          The rawField() method adds the data as-is instead of escaping it. I'm not sure if it's the best solution, but it works well for me.

          Show
          Francis added a comment - I think that even before thinking about how this could be improved, the sink should be fixed to make sure json data is not escaped. As I said before, with the patch, if an event contains a json header or body, it will be escaped and ES will treat it like a string. For example, with a body like this: {"foo":"bar"} the resulting document in ES will be {"@message": "{\"foo\":\"bar\"}}" To fix this, I modified the ContentBuilderUtil.addComplexField() method. I changed this: builder.field(fieldName, tmp.string()); with this: builder.rawField(fieldName, data); The rawField() method adds the data as-is instead of escaping it. I'm not sure if it's the best solution, but it works well for me.
          Hide
          Xuri Nagarin added a comment -

          I am using morphlines from Kite SDK to handle Json. My point is, sure you can make a minor code change in the sink to handle Json and that will handle our use case. But what if the body has some other structure format than Json? Wouldn't you then have to add logic to first detect the structure (Json, XML etc) and then apply corresponding processing to match what ES expects? That might make the sink un-necessarily complex?

          morphlines : [
          {
          id : morphline1

          importCommands : ["org.kitesdk.**","org.apache.solr.**"]

          commands : [
          {
          readJson {}
          }

          {
          extractJsonPaths {
          flatten : true
          paths :

          { appname : /appname serviceType : /serviceType authStatus : /authStatus uuid : /uuid service : /service rawMsg : /rawMsg clientSrcPort : /clientSrcPort user : /user hostname : /hostname ts : /ts sshProtocol : /sshProtocol environment : /environment logType : /logType sshAuthMethod : /sshAuthMethod rawTimestamp : /rawTimestamp normalizedTimeStamp : /normalizedTimeStamp regexMatch : /regexMatch clientIp : /clientIp syslogHostIp : /syslogHostIp logAgent : /logAgent }

          }
          }

          { generateUUID

          { field : uuid preserveExisting: true }

          }

          { setValues {_attachment_body : "@

          {rawMsg}

          " }}

          { toByteArray

          { field : _attachment_body}

          }

          ]
          }
          ]

          Show
          Xuri Nagarin added a comment - I am using morphlines from Kite SDK to handle Json. My point is, sure you can make a minor code change in the sink to handle Json and that will handle our use case. But what if the body has some other structure format than Json? Wouldn't you then have to add logic to first detect the structure (Json, XML etc) and then apply corresponding processing to match what ES expects? That might make the sink un-necessarily complex? morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**","org.apache.solr.**"] commands : [ { readJson {} } { extractJsonPaths { flatten : true paths : { appname : /appname serviceType : /serviceType authStatus : /authStatus uuid : /uuid service : /service rawMsg : /rawMsg clientSrcPort : /clientSrcPort user : /user hostname : /hostname ts : /ts sshProtocol : /sshProtocol environment : /environment logType : /logType sshAuthMethod : /sshAuthMethod rawTimestamp : /rawTimestamp normalizedTimeStamp : /normalizedTimeStamp regexMatch : /regexMatch clientIp : /clientIp syslogHostIp : /syslogHostIp logAgent : /logAgent } } } { generateUUID { field : uuid preserveExisting: true } } { setValues {_attachment_body : "@ {rawMsg} " }} { toByteArray { field : _attachment_body} } ] } ]
          Hide
          Francis added a comment - - edited

          I'm not sure to understand your point. The sink is already doing this content type detection. Look at the ContentBuilderUtil:

          XContentType contentType = XContentFactory.xContentType(data);
          if (contentType == null) {
            addSimpleField(builder, field, data);
          } else {
            addComplexField(builder, field, contentType, data);
          }
          

          The ES XContentFactory.xContentType() method tries to detect the data format. Then, in the addComplexField() method, a parser corresponding to the detected type is instantiated. The XContentFactory supports the following content types: JSON, YAML and Smile. Everything else will be added as-is as a text field.

          I'm not proposing to handle any content type. I just want the current code to work as I think it was initially supposed to work. The sink handles the content types that are already supported by the ES SDK and that's it. Nothing more.

          Show
          Francis added a comment - - edited I'm not sure to understand your point. The sink is already doing this content type detection. Look at the ContentBuilderUtil: XContentType contentType = XContentFactory.xContentType(data); if (contentType == null ) { addSimpleField(builder, field, data); } else { addComplexField(builder, field, contentType, data); } The ES XContentFactory.xContentType() method tries to detect the data format. Then, in the addComplexField() method, a parser corresponding to the detected type is instantiated. The XContentFactory supports the following content types: JSON, YAML and Smile. Everything else will be added as-is as a text field. I'm not proposing to handle any content type. I just want the current code to work as I think it was initially supposed to work. The sink handles the content types that are already supported by the ES SDK and that's it. Nothing more.
          Hide
          Francis added a comment -

          I think I found how to fix this bug once for all. The source of the problem is caused by the way a "complex field" is added. The ES XContent classes are used to parse the data in the detected format, but then, instead of adding the parsed data, the string() method is called and it converts it back to a string that is the same as the initial data! Here is the current code with added comments:

          XContentBuilder tmp = jsonBuilder(); // This tmp builder is completely useless.
          parser = XContentFactory.xContent(contentType).createParser(data);
          parser.nextToken();
          tmp.copyCurrentStructure(parser); // This copies the whole parsed data in this tmp builder.
          // Here, by calling tmp.string(), we get the parsed data converted back to a string.
          // This means that tmp.string() == String(data)!
          // All this parsing for nothing...
          // And then, as the field(String, String) method is called on the builder, and the builder being a jsonBuilder,
          // the string will be escaped according to the JSON specifications. 
          builder.field(fieldName, tmp.string());
          

          If we really want to take advantage of the XContent classes, we have to add the parsed data to the builder. To do this, it is as simply as:

          parser = XContentFactory.xContent(contentType).createParser(data);
          parser.nextToken();
          // Add the field name, but not the value.
          builder.field(fieldName);
          // This will add the whole parsed content as the value of the field.
          builder.copyCurrentStructure(parser);
          

          I tried this and it works as expected. This is almost the same as the initial workaround posted in this bug description. I don't understand why it hasn't been used as a starting point to fix this bug.

          Show
          Francis added a comment - I think I found how to fix this bug once for all. The source of the problem is caused by the way a "complex field" is added. The ES XContent classes are used to parse the data in the detected format, but then, instead of adding the parsed data, the string() method is called and it converts it back to a string that is the same as the initial data! Here is the current code with added comments: XContentBuilder tmp = jsonBuilder(); // This tmp builder is completely useless. parser = XContentFactory.xContent(contentType).createParser(data); parser.nextToken(); tmp.copyCurrentStructure(parser); // This copies the whole parsed data in this tmp builder. // Here, by calling tmp.string(), we get the parsed data converted back to a string. // This means that tmp.string() == String (data)! // All this parsing for nothing... // And then, as the field( String , String ) method is called on the builder, and the builder being a jsonBuilder, // the string will be escaped according to the JSON specifications. builder.field(fieldName, tmp.string()); If we really want to take advantage of the XContent classes, we have to add the parsed data to the builder. To do this, it is as simply as: parser = XContentFactory.xContent(contentType).createParser(data); parser.nextToken(); // Add the field name, but not the value. builder.field(fieldName); // This will add the whole parsed content as the value of the field. builder.copyCurrentStructure(parser); I tried this and it works as expected. This is almost the same as the initial workaround posted in this bug description. I don't understand why it hasn't been used as a starting point to fix this bug.
          Hide
          Xuri Nagarin added a comment -

          I was arguing only from a devil's advocate point of view If this patch takes Json from flume event body and posts it to ES cleanly then I couldn't be happier so I can dump the morphlines interceptor. I will upvote your patch.

          Show
          Xuri Nagarin added a comment - I was arguing only from a devil's advocate point of view If this patch takes Json from flume event body and posts it to ES cleanly then I couldn't be happier so I can dump the morphlines interceptor. I will upvote your patch.
          Hide
          Jarek Jarcec Cecho added a comment -

          Since the patch on this JIRA has been committed and not reverted, might I suggest to either:

          1) Resolve this JIRA and move the open questions to subsequent discussion
          2) Revert the committed patch

          Show
          Jarek Jarcec Cecho added a comment - Since the patch on this JIRA has been committed and not reverted, might I suggest to either: 1) Resolve this JIRA and move the open questions to subsequent discussion 2) Revert the committed patch
          Hide
          Jarek Jarcec Cecho added a comment -

          I didn't get any response on the JIRA, so I'll return it back to "resolved" state. Let's take any subsequent discussion to separate JIRA.

          Show
          Jarek Jarcec Cecho added a comment - I didn't get any response on the JIRA, so I'll return it back to "resolved" state. Let's take any subsequent discussion to separate JIRA.
          Hide
          Francis added a comment -
          Show
          Francis added a comment - I created a new bug: https://issues.apache.org/jira/browse/FLUME-2649
          Hide
          Edward Sargisson added a comment -

          To users of the elasticsearch sink who have cases where it fails:
          There is a proposed fix in FLUME-2649 (patch #5, Review 1: https://reviews.apache.org/r/32770/, Review 2: https://reviews.apache.org/r/33215/).

          If you have time, please check that your particular use case is covered in the tests with the result you expect. My review of the cases in this work item suggest that it will but it would be good to be sure.

          Note that I'd like to get this closed in the next day or so, if possible.

          Show
          Edward Sargisson added a comment - To users of the elasticsearch sink who have cases where it fails: There is a proposed fix in FLUME-2649 (patch #5, Review 1: https://reviews.apache.org/r/32770/ , Review 2: https://reviews.apache.org/r/33215/ ). If you have time, please check that your particular use case is covered in the tests with the result you expect. My review of the cases in this work item suggest that it will but it would be good to be sure. Note that I'd like to get this closed in the next day or so, if possible.

            People

            • Assignee:
              Unassigned
              Reporter:
              Massimo Paladin
            • Votes:
              2 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development