Uploaded image for project: 'Apache Drill'
  1. Apache Drill
  2. DRILL-7686

Excessive memory use in partition sender

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.14.0
    • None
    • None
    • None

    Description

      The Partition Sender in Drill is responsible to take a batch from fragment x, and send its rows to all other fragments f1, f2, ... fn. For example, when joining, fragment x might read from a portion of a file, hash the join key, and partition rows by hash key to the receiving fragments that join rows with that same key.

      Since Drill is columnar, the sender needs to send a batch of columns to each receiver. To be efficient, that batch should contain a reasonable number of rows. The current default is 1024.

      Drill creates buffers, one per sender, to gather the rows. Thus, each sender needs n buffers: one for each receiver.

      Because Drill is symmetrical, there are n senders (scans). Since each maintains n send buffers, we have a total of n^2 buffers. That is, the amount of memory used by the partition sender grows with the square of the degree of parallelism for a query.

      In addition, as seen in DRILL-7675, the size of the buffers is controlled not by Drill, but by the incoming data. The query in DRILL-7675 had a row with 260+ fields, some of which were map arrays.

      The result is that the query, which processes 2 MB of data, runs out of memory when may GB are available. Drill is simply doing the math: n^2 buffers, each with 1024 rows, each with 250 fields, many with a cardinality of 5x (or 25x or 125x, depending on array depth) of the row count. The result is a very large memory footprint.

      There is no simple bug-fix solution: the design is inherently unbounded. This ticket asks to develop a new design. Some crude ideas:

      • Use a row-based format for sending to avoid columnar overhead.
      • Send rows as soon as they are available on the sender side; allow the receiver to do buffering.
      • If doing buffering, flush rows after x ms to avoid slowing the system. (The current approach waits for buffers to fill.)
      • Consolidate buffers on each sending node. (This is the Mux/DeMux approach which is in the code, but was never well understood, and has its own concurrency, memory ownership problems.)

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Paul.Rogers Paul Rogers
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: