Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14938

Flink elasticsearch failure handler re-add indexrequest causes ConcurrentModificationException

    XMLWordPrintableJSON

Details

    Description

       

      When use Elasticsearch connector failure handler (from official example) to re-add documents, Flink encountered ConcurrentModificationException.

      input.addSink(new ElasticsearchSink<>(
          config, transportAddresses,
          new ElasticsearchSinkFunction<String>() {...},
          new ActionRequestFailureHandler() {
              @Override
              void onFailure(ActionRequest action,
                      Throwable failure,
                      int restStatusCode,
                      RequestIndexer indexer) throw Throwable {
      
                  if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
                      // full queue; re-add document for indexing
                      indexer.add(action);
                  }
              }
      }));
      

      I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, it will iterator a list of ActionRequest. However the failure handler will keep re-adding request to that list after bulk, which causes ConcurrentModificationException.

      void processBufferedRequests(RequestIndexer actualIndexer) {
         for (ActionRequest request : bufferedRequests) {
            if (request instanceof IndexRequest) {
               actualIndexer.add((IndexRequest) request);
            } else if (request instanceof DeleteRequest) {
               actualIndexer.add((DeleteRequest) request);
            } else if (request instanceof UpdateRequest) {
               actualIndexer.add((UpdateRequest) request);
            }
         }
      
         bufferedRequests.clear();
      }

      I think it should be a multi-thread bug and is it ok to use concurrent queue to maintain the failure request?

       

      Attachments

        Issue Links

          Activity

            People

              ysn2233 Shengnan YU
              ysn2233 Shengnan YU
              Votes:
              3 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: