Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12551

elasticsearch6 connector print log error

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Not A Bug
    • 1.6.3
    • None
    • None

    Description

      when i use elasticsearch connector ,when my project is running,i find some data does not insert elasticsearch ,so i want to read log help me ,but the log does contain importance message,so i read source code (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i find a error on write ERROR log.

       

      @Override
      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
       if (response.hasFailures()) {
        BulkItemResponse itemResponse;
        Throwable failure;
        RestStatus restStatus;
      
        try {
         for (int i = 0; i < response.getItems().length; i++) {
          itemResponse = response.getItems()[i];
          failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
          if (failure != null) {
           LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
      
           restStatus = itemResponse.getFailure().getStatus();
           if (restStatus == null) {
            failureHandler.onFailure(request.requests().get(i), failure, -1, requestIndexer);
           } else {
            failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(), requestIndexer);
           }
          }
         }
        } catch (Throwable t) {
         // fail the sink and skip the rest of the items
         // if the failure handler decides to throw an exception
         failureThrowable.compareAndSet(null, t);
        }
       }
      
       if (flushOnCheckpoint) {
        numPendingRequests.getAndAdd(-request.numberOfActions());
       }
      }
      
      @Override
       public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
      
        try {
         for (ActionRequest action : request.requests()) {
          failureHandler.onFailure(action, failure, -1, requestIndexer);
         }
        } catch (Throwable t) {
         // fail the sink and skip the rest of the items
         // if the failure handler decides to throw an exception
         failureThrowable.compareAndSet(null, t);
        }
      
        if (flushOnCheckpoint) {
         numPendingRequests.getAndAdd(-request.numberOfActions());
        }
       }
      }
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            sunxiongkun sunxiongkun
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: