Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2254

Improve log when there is a failure in a BulkRequest in ES Sink and avoid stuck failures in the channel

    XMLWordPrintableJSON

Details

    • Bug
    • Status: In Progress
    • Minor
    • Resolution: Unresolved
    • 1.4.0
    • None
    • Sinks+Sources
    • None

    Description

      I found two problems when there is a failure trying do to a BulkRequest to ElasticSearch using the ElasticSearch sink.

      One of them is that if there is one insertion failing inside the entire bulk, the events from the BulkRequest get stuck in the channel (even the ones that are not failing because if one fails the entire request is failing) and ours indexes grow because it's inserting the same events again and again (the ones that don't have any error) in ES due to Flume is retrying to insert those events.

      Another one is that it's difficult to see in the logs what events are the ones causing a fail in the entire request. Now you can only see the error output from ElasticSearch and it will very nice to know what events are not being inserted in ElasticSearch.

      My proposal to solve this is:

      • To save all the Events of the BulkRequest in some structure.
      • Remove the throw of EventDeliveryException.
      • If there is a failure, iterate over each event doing a request to ES, trying to know which events cannot be inserted in ES. If there is a failure with one of them, print the error and the event to the logs.

      Here is an example of how this can be implemented, it's not the most smart way to do it, but for me it works.

      @@ -81,6 +80,8 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
           try {
             txn.begin();
             BulkRequestBuilder bulkRequest = client.prepareBulk();
      +      LinkedList<Event> events = new LinkedList<Event>();
      +
             for (int i = 0; i < batchSize; i++) {
               Event event = channel.take();
       
      @@ -88,14 +89,14 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
                 break;
               }
       
              IndexRequestBuilder indexRequest =
                  indexRequestFactory.createIndexRequest(
                      client, indexName, indexType, event);
       
               if (ttlMs > 0) {
                 indexRequest.setTTL(ttlMs);
               }
       
      +        events.add(event);
               bulkRequest.add(indexRequest);
             }
      
      
      @@ -116,7 +117,35 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
       
               BulkResponse bulkResponse = bulkRequest.execute().actionGet();
               if (bulkResponse.hasFailures()) {
      -          throw new EventDeliveryException(bulkResponse.buildFailureMessage());
      +          logger
      +              .warn("There is a failure in the bulk request with this output\n"
      +                  + bulkResponse.buildFailureMessage());
      +          logger
      +              .warn("Trying to do the requests separately to know what notifications"
      +                  + " of the requests have errors");
      +          for (Event event : events) {
      +            try {
      +              indexRequestFactory
      +                  .createIndexRequest(client, indexName, indexType, event)
      +                  .execute().actionGet();
      +            } catch (Exception e) {
      +              logger.error(e.getMessage());
      +
      +              if (serializer instanceof ElasticSearchEventSerializer) {
      +                XContentBuilder builder = (XContentBuilder) ((ElasticSearchEventSerializer) serializer)
      +                    .getContentBuilder(event);
      +                logger
      +                    .error("There is an error with the following notification:\n"
      +                        + builder.string());
      +              } else {
      +                logger
      +                    .error("There is no possibility of printing the notification because"
      +                        + " the serializer is a different instance from"
      +                        + " ElasticSearchEventSerializer");
      +              }
      +
      +            }
      +          }
               }
             }
             txn.commit();
      

      Attachments

        Activity

          People

            paliwalashish Ashish Paliwal
            lpigueir Luis Pigueiras
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: