Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2649

Elasticsearch sink doesn't handle JSON fields correctly

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.6.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      JSON attributes are treated like normal strings and are escaped by the sink. For example, if the body or a header contains the following value:

      {"foo":"bar"}
      

      It will be added like this in Elasticsearch:

      {"@message": "{\"foo\":\"bar\"}}"
      

      We end up with a plain string instead of a valid JSON field.

      I think I found how to fix this bug. The source of the problem is caused by the way a "complex field" is added. The ES XContent classes are used to parse the data in the detected format, but then, instead of adding the parsed data, the string() method is called and it converts it back to a string that is the same as the initial data! Here is the current code with added comments:

      XContentBuilder tmp = jsonBuilder(); // This tmp builder is completely useless.
      parser = XContentFactory.xContent(contentType).createParser(data);
      parser.nextToken();
      tmp.copyCurrentStructure(parser); // This copies the whole parsed data in this tmp builder.
      // Here, by calling tmp.string(), we get the parsed data converted back to a string.
      // This means that tmp.string() == String(data)!
      // All this parsing for nothing...
      // And then, as the field(String, String) method is called on the builder, and the builder being a jsonBuilder,
      // the string will be escaped according to the JSON specifications. 
      builder.field(fieldName, tmp.string());
      

      If we really want to take advantage of the XContent classes, we have to add the parsed data to the builder. To do this, it is as simply as:

      parser = XContentFactory.xContent(contentType).createParser(data);
      parser.nextToken();
      // Add the field name, but not the value.
      builder.field(fieldName);
      // This will add the whole parsed content as the value of the field.
      builder.copyCurrentStructure(parser);
      

      I tried this and it works as expected.

        Attachments

        1. FLUME-2649-0.patch
          5 kB
          Benjamin Fiorini
        2. FLUME-2649-1.patch
          6 kB
          Benjamin Fiorini
        3. FLUME-2649-2.patch
          6 kB
          Benjamin Fiorini
        4. FLUME-2649-3.patch
          5 kB
          Benjamin Fiorini
        5. FLUME-2649-4.patch
          5 kB
          Benjamin Fiorini
        6. FLUME-2649-5.patch
          5 kB
          Benjamin Fiorini
        7. FLUME-2649-6.patch
          5 kB
          Benjamin Fiorini

          Issue Links

            Activity

              People

              • Assignee:
                bfiorini Benjamin Fiorini
                Reporter:
                faelenor Francis
              • Votes:
                2 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: