Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6052

elasticsearchIO checkForErrors method bug

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.12.0
    • Component/s: io-java-elasticsearch
    • Labels:
    • Environment:
      beam-sdk-java-io-elasticsearch-2.8.0

      Description

      When i use Write to write update bulk request to elasticsearch, it appear the exception below:

      Caused by: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
          at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
          at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
          at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
      

      I check the method of checkForErrors, found out that can not parse the response include update contents. So i add the logic for parse update, i can see the output like below:

      Caused by: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
      Document id 1465285334751e039cc4883a8a270191: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
      Document id e2722c653c65a4cb119e9b8dc44e37ad: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
      Document id b25472e3665695c49861f6eceee5531a: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
      Document id 022c1accdae82f6fe4108ba7989732fc: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
          at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
          at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
          at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
      

      the reponse content is like below:

      {
          "took": 293,
          "errors": true,
          "items": [
          {
              "update": {
                  "_index": "test_kevin_2018-11",
                  "_type": "kevin",
                  "_id": "8d7664286c0887c637229166c7c613bc",
                  "_version": 1,
                  "result": "noop",
                  "_shards":

      {                 "total": 1,                 "successful": 1,                 "failed": 0             }

      ,
                  "status": 200
              }
          },                
          {
              "update": {
                  "_index": "test_kevin_2018-11",
                  "_type": "kevin",
                  "_id": "49952be98f4fc160f56bcdb33b1dbf7e",
                  "status": 429,
                  "error":

      {                 "type": "es_rejected_execution_exception",                 "reason": "rejected execution of org.elasticsearch.transport.TransportService$7@3f70bbe7 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 200, completed tasks = 10034174]]"             }

              }
          }
      }

        Attachments

          Activity

            People

            • Assignee:
              echauchot Etienne Chauchot
              Reporter:
              kevin_123 Fred k
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 9h 20m
                9h 20m