Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: Impala 2.6.0
    • Fix Version/s: Impala 2.8.0
    • Component/s: Backend
    • Labels:
      None

      Description

      Currently the build and probe of all join types are implemented in a single class that interleaves building and probing when spilling.

      We should separate out the build side to make it more flexible for multi-threading.

        Issue Links

          Activity

          Hide
          tarmstrong Tim Armstrong added a comment -

          IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder

          The main outcome of this patch is to split out PhjBuilder from
          PartitionedHashJoinNode, which manages the build partitions for the
          join and implements the DataSink interface.

          A lot of this work is fairly mechanical refactoring: dividing the state
          between the two classes and duplicating it where appropriate. One major
          change is that probe partitions need to be separated from build
          partitions.

          This required some significant algorithmic changes to memory management
          for probe partition buffers: memory management for probe partitions was
          entangled with the build partitions: small buffers were allocated for
          the probe partitions even for non-spilling joins and the join could
          spill additional partitions during probing if the probe partitions needed
          to switch from small to I/O buffers. The changes made were:

          • Probe partitions are only initialized after the build is partitioned, and
            only for spilled build partitions.
          • Probe partitions never use small buffers: once the initial write
            buffer is allocated, appending to the probe partition never fails.
          • All probe partition allocation is done after partitioning the build
            and before processing the probe input during the same phase as hash
            table building. (Aside from NAAJ partitions which are allocated
            upfront).

          The probe partition changes necessitated a change in
          BufferedTupleStream: allocation of write blocks is now explicit via the
          PrepareForWrite() API.

          Testing:
          Ran exhaustive build and local stress test.

          Memory Usage:
          Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20.
          No regressions on TPC-DS. TPC-H either stayed the same or improved in
          min memory requirement without spilling, but the min memory requirement
          with spilling regressed in some cases. I investigated each of the
          significant regressions on TPC-H and determined that they were all due
          to exec nodes racing for spillable or non-spillable memory. None of them
          were cases where exec nodes got their minimum reservation and failed to
          execute the spilling algorithm correctly.

          Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb
          Reviewed-on: http://gerrit.cloudera.org:8080/3873
          Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
          Reviewed-by: Dan Hecht <dhecht@cloudera.com>
          Tested-by: Internal Jenkins

          Show
          tarmstrong Tim Armstrong added a comment - IMPALA-3567 Part 2, IMPALA-3899 : factor out PHJ builder The main outcome of this patch is to split out PhjBuilder from PartitionedHashJoinNode, which manages the build partitions for the join and implements the DataSink interface. A lot of this work is fairly mechanical refactoring: dividing the state between the two classes and duplicating it where appropriate. One major change is that probe partitions need to be separated from build partitions. This required some significant algorithmic changes to memory management for probe partition buffers: memory management for probe partitions was entangled with the build partitions: small buffers were allocated for the probe partitions even for non-spilling joins and the join could spill additional partitions during probing if the probe partitions needed to switch from small to I/O buffers. The changes made were: Probe partitions are only initialized after the build is partitioned, and only for spilled build partitions. Probe partitions never use small buffers: once the initial write buffer is allocated, appending to the probe partition never fails. All probe partition allocation is done after partitioning the build and before processing the probe input during the same phase as hash table building. (Aside from NAAJ partitions which are allocated upfront). The probe partition changes necessitated a change in BufferedTupleStream: allocation of write blocks is now explicit via the PrepareForWrite() API. Testing: Ran exhaustive build and local stress test. Memory Usage: Ran stress test binary search locally for TPC-DS SF-1 and TPC-H SF-20. No regressions on TPC-DS. TPC-H either stayed the same or improved in min memory requirement without spilling, but the min memory requirement with spilling regressed in some cases. I investigated each of the significant regressions on TPC-H and determined that they were all due to exec nodes racing for spillable or non-spillable memory. None of them were cases where exec nodes got their minimum reservation and failed to execute the spilling algorithm correctly. Change-Id: I1e02ea9c7a7d1a0f373b11aa06c3237e1c7bd4cb Reviewed-on: http://gerrit.cloudera.org:8080/3873 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Reviewed-by: Dan Hecht <dhecht@cloudera.com> Tested-by: Internal Jenkins
          Hide
          tarmstrong Tim Armstrong added a comment -

          IMPALA-3567: move ExecOption profile helpers to RuntimeProfile

          This is groundwork for IMPALA-3567, which will move some logic that
          previously resided in ExecNodes into DataSinks. We want to report
          ExecOption strings consistently in both ExecNodes and DataSinks,
          so that logic needs to move to a shared place (e.g. the RuntimeProfile
          itself).

          I ran the patch through clang-format, which changed line wrapping and
          indentation of a few surrounding lines.

          Change-Id: I21c1dda8f8a1d92172bf59fbc1070a6834e61913
          Reviewed-on: http://gerrit.cloudera.org:8080/4188
          Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
          Tested-by: Internal Jenkins

          Show
          tarmstrong Tim Armstrong added a comment - IMPALA-3567 : move ExecOption profile helpers to RuntimeProfile This is groundwork for IMPALA-3567 , which will move some logic that previously resided in ExecNodes into DataSinks. We want to report ExecOption strings consistently in both ExecNodes and DataSinks, so that logic needs to move to a shared place (e.g. the RuntimeProfile itself). I ran the patch through clang-format, which changed line wrapping and indentation of a few surrounding lines. Change-Id: I21c1dda8f8a1d92172bf59fbc1070a6834e61913 Reviewed-on: http://gerrit.cloudera.org:8080/4188 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Tested-by: Internal Jenkins —
          Hide
          tarmstrong Tim Armstrong added a comment -

          IMPALA-3567: Part 1: groundwork to make Join build sides DataSinks

          Refactor DataSink interface to be more generic. We need more flexibility
          in setting up MemTrackers, so that memory is accounted against the right
          ExecNode. Also removes some redundancy between DataSink subclasses in
          setting up RuntimeProfiles, etc.

          Remove the redundancy in the DataSink between passing eos to GetNext()
          and FlushFinal(). This simplifies HdfsTableSink quite a bit and makes
          handling empty batches simpler.

          Partially refactor join nodes so that the control flow between
          BlockingJoinNode::Open() and its subclasses is easier to follow.
          BlockingJoinNode now only calls one virtual function on its
          subclasses: ConstructJoinBuild(). Once we convert all join nodes
          to use the DataSink interface, we will also be able to remove that
          as well.

          As a minor optimisation, avoid updating a timer that is ignored for
          non-async builds.

          As a proof of concept, this patch separates out the build side of
          NestedLoopJoinNode into a class that implements the DataSink
          interface. The main challenge here is that NestedLoopJoinNode
          recycles row batches to avoid reallocations and copies of row
          batches in subplans. The solution to this is:

          • Retain the special-case optimisation for SingularRowSrc
          • Use the row batch cache and RowBatch::AcquireState() to copy
            the state of row batches passed to Send(), while recycling the
            RowBatch objects.

          Refactoring the partitioned hash join is left for Part 2.

          Testing:
          Ran exhaustive, core ASAN, and exhaustive non-partioned agg/join builds.
          Also ran a local stress test.

          Performance:
          Ran TPC-H nested locally. The results show that the change is perf-neutral.

          --------------------------------------------------------------------------------+

          Workload File Format Avg (s) Delta(Avg) GeoMean(s) Delta(GeoMean)

          --------------------------------------------------------------------------------+

          TPCH_NESTED(_20) parquet / none / none 7.75 +0.19% 5.64 -0.34%

          --------------------------------------------------------------------------------+

          -----------------------------------------------------------------------------------------------------------------+

          Workload Query File Format Avg(s) Base Avg(s) Delta(Avg) StdDev(%) Base StdDev(%) Num Clients Iters

          -----------------------------------------------------------------------------------------------------------------+

          TPCH_NESTED(_20) TPCH-Q17 parquet / none / none 18.96 17.95 +5.61% 4.97% 0.71% 1 10
          TPCH_NESTED(_20) TPCH-Q14 parquet / none / none 3.61 3.56 +1.25% 0.97% 1.19% 1 10
          TPCH_NESTED(_20) TPCH-Q8 parquet / none / none 6.25 6.23 +0.44% 0.44% 0.90% 1 10
          TPCH_NESTED(_20) TPCH-Q10 parquet / none / none 5.84 5.83 +0.30% 1.21% 1.82% 1 10
          TPCH_NESTED(_20) TPCH-Q5 parquet / none / none 4.91 4.90 +0.11% 0.18% 0.78% 1 10
          TPCH_NESTED(_20) TPCH-Q21 parquet / none / none 17.82 17.81 +0.07% 0.66% 0.58% 1 10
          TPCH_NESTED(_20) TPCH-Q4 parquet / none / none 5.12 5.12 -0.02% 0.97% 1.23% 1 10
          TPCH_NESTED(_20) TPCH-Q9 parquet / none / none 23.85 23.88 -0.15% 0.72% 0.22% 1 10
          TPCH_NESTED(_20) TPCH-Q12 parquet / none / none 6.15 6.16 -0.16% 1.60% 1.72% 1 10
          TPCH_NESTED(_20) TPCH-Q3 parquet / none / none 5.46 5.47 -0.23% 1.28% 0.90% 1 10
          TPCH_NESTED(_20) TPCH-Q16 parquet / none / none 3.61 3.62 -0.26% 1.00% 1.36% 1 10
          TPCH_NESTED(_20) TPCH-Q19 parquet / none / none 20.19 20.31 -0.58% 1.67% 0.65% 1 10
          TPCH_NESTED(_20) TPCH-Q7 parquet / none / none 9.42 9.48 -0.68% 0.87% 0.71% 1 10
          TPCH_NESTED(_20) TPCH-Q18 parquet / none / none 12.94 13.06 -0.90% 0.59% 0.48% 1 10
          TPCH_NESTED(_20) TPCH-Q22 parquet / none / none 1.09 1.10 -0.92% 2.26% 2.22% 1 10
          TPCH_NESTED(_20) TPCH-Q13 parquet / none / none 3.75 3.78 -0.94% 2.04% 2.86% 1 10
          TPCH_NESTED(_20) TPCH-Q20 parquet / none / none 4.33 4.37 -1.10% 3.00% 2.43% 1 10
          TPCH_NESTED(_20) TPCH-Q2 parquet / none / none 2.39 2.42 -1.38% 1.54% 1.30% 1 10
          TPCH_NESTED(_20) TPCH-Q11 parquet / none / none 1.43 1.46 -1.78% 2.05% 2.77% 1 10
          TPCH_NESTED(_20) TPCH-Q6 parquet / none / none 2.29 2.33 -1.79% 0.56% 1.23% 1 10
          TPCH_NESTED(_20) TPCH-Q15 parquet / none / none 5.04 5.13 -1.84% 0.61% 2.01% 1 10
          TPCH_NESTED(_20) TPCH-Q1 parquet / none / none 5.98 6.12 -2.30% 1.84% 3.19% 1 10

          -----------------------------------------------------------------------------------------------------------------+

          Change-Id: I9d7608181eeacfe706a09c1e153d0a3e1ee9b475
          Reviewed-on: http://gerrit.cloudera.org:8080/3842
          Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
          Reviewed-by: Marcel Kornacker <marcel@cloudera.com>
          Tested-by: Internal Jenkins

          Show
          tarmstrong Tim Armstrong added a comment - IMPALA-3567 : Part 1: groundwork to make Join build sides DataSinks Refactor DataSink interface to be more generic. We need more flexibility in setting up MemTrackers, so that memory is accounted against the right ExecNode. Also removes some redundancy between DataSink subclasses in setting up RuntimeProfiles, etc. Remove the redundancy in the DataSink between passing eos to GetNext() and FlushFinal(). This simplifies HdfsTableSink quite a bit and makes handling empty batches simpler. Partially refactor join nodes so that the control flow between BlockingJoinNode::Open() and its subclasses is easier to follow. BlockingJoinNode now only calls one virtual function on its subclasses: ConstructJoinBuild(). Once we convert all join nodes to use the DataSink interface, we will also be able to remove that as well. As a minor optimisation, avoid updating a timer that is ignored for non-async builds. As a proof of concept, this patch separates out the build side of NestedLoopJoinNode into a class that implements the DataSink interface. The main challenge here is that NestedLoopJoinNode recycles row batches to avoid reallocations and copies of row batches in subplans. The solution to this is: Retain the special-case optimisation for SingularRowSrc Use the row batch cache and RowBatch::AcquireState() to copy the state of row batches passed to Send(), while recycling the RowBatch objects. Refactoring the partitioned hash join is left for Part 2. Testing: Ran exhaustive, core ASAN, and exhaustive non-partioned agg/join builds. Also ran a local stress test. Performance: Ran TPC-H nested locally. The results show that the change is perf-neutral. ----------------- --------------------- ------- ---------- ---------- ---------------+ Workload File Format Avg (s) Delta(Avg) GeoMean(s) Delta(GeoMean) ----------------- --------------------- ------- ---------- ---------- ---------------+ TPCH_NESTED(_20) parquet / none / none 7.75 +0.19% 5.64 -0.34% ----------------- --------------------- ------- ---------- ---------- ---------------+ ----------------- -------- --------------------- ------ ----------- ---------- --------- -------------- ----------- ------+ Workload Query File Format Avg(s) Base Avg(s) Delta(Avg) StdDev(%) Base StdDev(%) Num Clients Iters ----------------- -------- --------------------- ------ ----------- ---------- --------- -------------- ----------- ------+ TPCH_NESTED(_20) TPCH-Q17 parquet / none / none 18.96 17.95 +5.61% 4.97% 0.71% 1 10 TPCH_NESTED(_20) TPCH-Q14 parquet / none / none 3.61 3.56 +1.25% 0.97% 1.19% 1 10 TPCH_NESTED(_20) TPCH-Q8 parquet / none / none 6.25 6.23 +0.44% 0.44% 0.90% 1 10 TPCH_NESTED(_20) TPCH-Q10 parquet / none / none 5.84 5.83 +0.30% 1.21% 1.82% 1 10 TPCH_NESTED(_20) TPCH-Q5 parquet / none / none 4.91 4.90 +0.11% 0.18% 0.78% 1 10 TPCH_NESTED(_20) TPCH-Q21 parquet / none / none 17.82 17.81 +0.07% 0.66% 0.58% 1 10 TPCH_NESTED(_20) TPCH-Q4 parquet / none / none 5.12 5.12 -0.02% 0.97% 1.23% 1 10 TPCH_NESTED(_20) TPCH-Q9 parquet / none / none 23.85 23.88 -0.15% 0.72% 0.22% 1 10 TPCH_NESTED(_20) TPCH-Q12 parquet / none / none 6.15 6.16 -0.16% 1.60% 1.72% 1 10 TPCH_NESTED(_20) TPCH-Q3 parquet / none / none 5.46 5.47 -0.23% 1.28% 0.90% 1 10 TPCH_NESTED(_20) TPCH-Q16 parquet / none / none 3.61 3.62 -0.26% 1.00% 1.36% 1 10 TPCH_NESTED(_20) TPCH-Q19 parquet / none / none 20.19 20.31 -0.58% 1.67% 0.65% 1 10 TPCH_NESTED(_20) TPCH-Q7 parquet / none / none 9.42 9.48 -0.68% 0.87% 0.71% 1 10 TPCH_NESTED(_20) TPCH-Q18 parquet / none / none 12.94 13.06 -0.90% 0.59% 0.48% 1 10 TPCH_NESTED(_20) TPCH-Q22 parquet / none / none 1.09 1.10 -0.92% 2.26% 2.22% 1 10 TPCH_NESTED(_20) TPCH-Q13 parquet / none / none 3.75 3.78 -0.94% 2.04% 2.86% 1 10 TPCH_NESTED(_20) TPCH-Q20 parquet / none / none 4.33 4.37 -1.10% 3.00% 2.43% 1 10 TPCH_NESTED(_20) TPCH-Q2 parquet / none / none 2.39 2.42 -1.38% 1.54% 1.30% 1 10 TPCH_NESTED(_20) TPCH-Q11 parquet / none / none 1.43 1.46 -1.78% 2.05% 2.77% 1 10 TPCH_NESTED(_20) TPCH-Q6 parquet / none / none 2.29 2.33 -1.79% 0.56% 1.23% 1 10 TPCH_NESTED(_20) TPCH-Q15 parquet / none / none 5.04 5.13 -1.84% 0.61% 2.01% 1 10 TPCH_NESTED(_20) TPCH-Q1 parquet / none / none 5.98 6.12 -2.30% 1.84% 3.19% 1 10 ----------------- -------- --------------------- ------ ----------- ---------- --------- -------------- ----------- ------+ Change-Id: I9d7608181eeacfe706a09c1e153d0a3e1ee9b475 Reviewed-on: http://gerrit.cloudera.org:8080/3842 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Reviewed-by: Marcel Kornacker <marcel@cloudera.com> Tested-by: Internal Jenkins
          Hide
          tarmstrong Tim Armstrong added a comment -

          I'm going to scope this down a little bit so that this JIRA doesn't include any work to make the build side shareable between threads. I'll close this JIRA once we have a clean separation of the join and build sides for NLJ and PHJ into separate classes.

          As the details of the multithreading implementation evolve then the join builders can be extended as needed.

          Show
          tarmstrong Tim Armstrong added a comment - I'm going to scope this down a little bit so that this JIRA doesn't include any work to make the build side shareable between threads. I'll close this JIRA once we have a clean separation of the join and build sides for NLJ and PHJ into separate classes. As the details of the multithreading implementation evolve then the join builders can be extended as needed.
          Hide
          tarmstrong Tim Armstrong added a comment -

          If you look at, say, PHJ::PrepareNextPartition(), the logic for bringing the a spilled build and probe into memory is intermixed.

          For multi-threading broadcast joins the relationship between build and probe won't be one-to-one, so as preparation for that we want to have a cleaner separation between build and probe. So we're going to need some kind of interface between the classes for the probe to say "I'm ready to process the next spilled partition" and for the build to tell the probe which spilled partition is next to be processed (assuming that the probe side is managing its own spilled probe rows).

          Show
          tarmstrong Tim Armstrong added a comment - If you look at, say, PHJ::PrepareNextPartition(), the logic for bringing the a spilled build and probe into memory is intermixed. For multi-threading broadcast joins the relationship between build and probe won't be one-to-one, so as preparation for that we want to have a cleaner separation between build and probe. So we're going to need some kind of interface between the classes for the probe to say "I'm ready to process the next spilled partition" and for the build to tell the probe which spilled partition is next to be processed (assuming that the probe side is managing its own spilled probe rows).
          Hide
          mmokhtar Mostafa Mokhtar added a comment -

          When is the communication needed? doesn't probing take care of that already?

          Show
          mmokhtar Mostafa Mokhtar added a comment - When is the communication needed? doesn't probing take care of that already?

            People

            • Assignee:
              tarmstrong Tim Armstrong
              Reporter:
              tarmstrong Tim Armstrong
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development