Flume
  1. Flume
  2. FLUME-2254

Improve log when there is a failure in a BulkRequest in ES Sink and avoid stuck failures in the channel

    Details

    • Type: Bug Bug
    • Status: In Progress
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: v1.4.0
    • Fix Version/s: None
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      I found two problems when there is a failure trying do to a BulkRequest to ElasticSearch using the ElasticSearch sink.

      One of them is that if there is one insertion failing inside the entire bulk, the events from the BulkRequest get stuck in the channel (even the ones that are not failing because if one fails the entire request is failing) and ours indexes grow because it's inserting the same events again and again (the ones that don't have any error) in ES due to Flume is retrying to insert those events.

      Another one is that it's difficult to see in the logs what events are the ones causing a fail in the entire request. Now you can only see the error output from ElasticSearch and it will very nice to know what events are not being inserted in ElasticSearch.

      My proposal to solve this is:

      • To save all the Events of the BulkRequest in some structure.
      • Remove the throw of EventDeliveryException.
      • If there is a failure, iterate over each event doing a request to ES, trying to know which events cannot be inserted in ES. If there is a failure with one of them, print the error and the event to the logs.

      Here is an example of how this can be implemented, it's not the most smart way to do it, but for me it works.

      @@ -81,6 +80,8 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
           try {
             txn.begin();
             BulkRequestBuilder bulkRequest = client.prepareBulk();
      +      LinkedList<Event> events = new LinkedList<Event>();
      +
             for (int i = 0; i < batchSize; i++) {
               Event event = channel.take();
       
      @@ -88,14 +89,14 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
                 break;
               }
       
              IndexRequestBuilder indexRequest =
                  indexRequestFactory.createIndexRequest(
                      client, indexName, indexType, event);
       
               if (ttlMs > 0) {
                 indexRequest.setTTL(ttlMs);
               }
       
      +        events.add(event);
               bulkRequest.add(indexRequest);
             }
      
      
      @@ -116,7 +117,35 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
       
               BulkResponse bulkResponse = bulkRequest.execute().actionGet();
               if (bulkResponse.hasFailures()) {
      -          throw new EventDeliveryException(bulkResponse.buildFailureMessage());
      +          logger
      +              .warn("There is a failure in the bulk request with this output\n"
      +                  + bulkResponse.buildFailureMessage());
      +          logger
      +              .warn("Trying to do the requests separately to know what notifications"
      +                  + " of the requests have errors");
      +          for (Event event : events) {
      +            try {
      +              indexRequestFactory
      +                  .createIndexRequest(client, indexName, indexType, event)
      +                  .execute().actionGet();
      +            } catch (Exception e) {
      +              logger.error(e.getMessage());
      +
      +              if (serializer instanceof ElasticSearchEventSerializer) {
      +                XContentBuilder builder = (XContentBuilder) ((ElasticSearchEventSerializer) serializer)
      +                    .getContentBuilder(event);
      +                logger
      +                    .error("There is an error with the following notification:\n"
      +                        + builder.string());
      +              } else {
      +                logger
      +                    .error("There is no possibility of printing the notification because"
      +                        + " the serializer is a different instance from"
      +                        + " ElasticSearchEventSerializer");
      +              }
      +
      +            }
      +          }
               }
             }
             txn.commit();
      

        Activity

        Hide
        Ashish Paliwal added a comment -

        This looks like to an exactly once semantics expectation, in case of an error.

        Hari Shreedharan AFAIK, we don't have exactly once semantics in Flume and duplicate event may be present in case of failure scenarios. Do we want to fix this?

        Show
        Ashish Paliwal added a comment - This looks like to an exactly once semantics expectation, in case of an error. Hari Shreedharan AFAIK, we don't have exactly once semantics in Flume and duplicate event may be present in case of failure scenarios. Do we want to fix this?
        Hide
        Ashish Paliwal added a comment -

        Planning to spend some time with ES, will give a try with that.

        Show
        Ashish Paliwal added a comment - Planning to spend some time with ES, will give a try with that.

          People

          • Assignee:
            Ashish Paliwal
            Reporter:
            Luis Pigueiras
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:

              Development