Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.8.1
Description
When use Elasticsearch connector failure handler (from official example) to re-add documents, Flink encountered ConcurrentModificationException.
input.addSink(new ElasticsearchSink<>( config, transportAddresses, new ElasticsearchSinkFunction<String>() {...}, new ActionRequestFailureHandler() { @Override void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throw Throwable { if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) { // full queue; re-add document for indexing indexer.add(action); } } }));
I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, it will iterator a list of ActionRequest. However the failure handler will keep re-adding request to that list after bulk, which causes ConcurrentModificationException.
void processBufferedRequests(RequestIndexer actualIndexer) { for (ActionRequest request : bufferedRequests) { if (request instanceof IndexRequest) { actualIndexer.add((IndexRequest) request); } else if (request instanceof DeleteRequest) { actualIndexer.add((DeleteRequest) request); } else if (request instanceof UpdateRequest) { actualIndexer.add((UpdateRequest) request); } } bufferedRequests.clear(); }
I think it should be a multi-thread bug and is it ok to use concurrent queue to maintain the failure request?
Attachments
Issue Links
- links to