Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10945

ElasticsearchIO performs 0 division on DirectRunner

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Triage Needed
    • Priority: P3
    • Resolution: Unresolved
    • Affects Version/s: 2.23.0
    • Fix Version/s: None
    • Labels:
      None
    • Environment:
      Beam 2.23
      Java 1.8.0_265
      Ubuntu 16.04
      Elastic version of cluster 7.9.1, cross cluster setup
      Parallelism of direct runner 8

      Description

      Environment configuration

      In my company we use Elasticsearch cross cluster setup for search. Cluster version is 7.9.1.

      I intended to use ElasticsearchIO for reading application logs and subsequently producing some aggregated data.

      Problem description

      1. In cross cluster ES setup, there is no /<index>/_stats API available, so it is not possible to compute ElasticsearchIO#getEstimatedSizeBytes properly.
      2. statsJson returned by the cluster looks like this:

        Unknown macro: { "_shards" }
        ,
        "_all" :
        Unknown macro: { "primaries" }
        ,
        "total" : { }
        },
        "indices" : { }
        }

      3. That means that totalCount value cannot be parsed from the json and is thus set to 0.
      4. Which means that estimatedByteSize value is set to 1 (Which itself is a workaround for similar issue.)
      5. ElasticsearchIO#getEstimatedSizeBytes is used in BoundedReadEvaluatorFactory#getInitialInputs which does not check the value and performs division of two long values, which of course results in 0 for any targetParallelism > 1.
      6. Then ElasticsearchIO#split is called with indexSize = 1 and desiredBundleSizeBytes = 1. Which sets nbBundlesFloat value to infinity.
      7. Even though the number of bundles is ceiled at 1024, reading from 1024 BoundedElasticsearchSources concurrently makes the ElasticsearchIO virtually impossible to use on direct runner.

      Resolution suggestion

      I still haven't tested reading from ElasticsearchIO on proper runner (we use flink 1.10.2), so I cannot either confirm or deny its functionality on our elastic setup. At the moment I'm just suggesting few checks of input values so the zero division and unnecessary parallelism problems are eliminated on direct runner.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              DraCzech Milan Nikl
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: