Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-4495

Runtime filters are disabled based on stats before they even arrive, contributing to performance cliff on TPC-H Q2

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: Impala 2.7.0, Impala 2.8.0
    • Fix Version/s: Impala 2.8.0
    • Component/s: Backend
    • Labels:

      Description

      The logic for disabling runtime filters based on stats is faulty. The issue is that the runtime filters are evaluated even before they arrive. This evaluation always returns true, which results in 'considered' being incremented but not 'rejected'. This in turns leads the logic for filter disabling concluding that the filter is ineffective. However, there is no way to know whether the filter is ineffective before it arrives.

      bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) {
        int num_filters = filter_ctxs_.size();
        for (int i = 0; i < num_filters; ++i) {
          LocalFilterStats* stats = &filter_stats_[i];
          if (!stats->enabled) continue;
          const RuntimeFilter* filter = filter_ctxs_[i]->filter;
          // Check filter effectiveness every ROWS_PER_FILTER_SELECTIVITY_CHECK rows.
          // TODO: The stats updates and the filter effectiveness check are executed very
          // frequently. Consider hoisting it out of of this loop, and doing an equivalent
          // check less frequently, e.g., after producing an output batch.
          ++stats->total_possible;
          if (UNLIKELY(
              !(stats->total_possible & (ROWS_PER_FILTER_SELECTIVITY_CHECK - 1)))) {
            double reject_ratio = stats->rejected / static_cast<double>(stats->considered);
            if (filter->AlwaysTrue() ||
                reject_ratio < FLAGS_parquet_min_filter_reject_ratio) {
              stats->enabled = 0;
              continue;
            }
          }
          ++stats->considered;
          void* e = filter_ctxs_[i]->expr->GetValue(row);
          if (!filter->Eval<void>(e, filter_ctxs_[i]->expr->root()->type())) {
            ++stats->rejected;
            return false;
          }
        }
        return true;
      }
      

      I was able to reproduce this easily on TPC-H Q2 with scale factor 20 and runtime_filter_arrival_wait_time_ms=1 . I added logging to prove that the filters were being disabled before they arrived:

      diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
      index 6b157aa..d1dfd92 100644
      --- a/be/src/exec/hdfs-parquet-scanner.cc
      +++ b/be/src/exec/hdfs-parquet-scanner.cc
      @@ -676,6 +676,8 @@ bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) {
             double reject_ratio = stats->rejected / static_cast<double>(stats->considered);
             if (filter->AlwaysTrue() ||
                 reject_ratio < FLAGS_parquet_min_filter_reject_ratio) {
      +        LOG(INFO) << "Disabling filter " << filter->id() << " HasBloomFilter " << filter->HasBloomFilter()
      +                  << " rejected " << stats->rejected << " considered " << stats->considered;
               stats->enabled = 0;
               continue;
             }
      
      I1116 09:42:14.286149 26839 hdfs-parquet-scanner.cc:679] Disabling filter 3 HasBloomFilter 0 rejected 0 considered 16383
      I1116 09:42:15.449862 26890 hdfs-parquet-scanner.cc:679] Disabling filter 0 HasBloomFilter 0 rejected 0 considered 16383
      I1116 09:42:15.449887 26890 hdfs-parquet-scanner.cc:679] Disabling filter 4 HasBloomFilter 1 rejected 0 considered 16383
      I1116 09:42:15.473232 26891 hdfs-parquet-scanner.cc:679] Disabling filter 0 HasBloomFilter 0 rejected 0 considered 16383
      I1116 09:42:15.473254 26891 hdfs-parquet-scanner.cc:679] Disabling filter 4 HasBloomFilter 1 rejected 0 considered 16383
      I1116 09:42:20.298415 26900 hdfs-parquet-scanner.cc:679] Disabling filter 6 HasBloomFilter 1 rejected 0 considered 16383
      
      1. TPC-DS-Q59.txt
        847 kB
        Mostafa Mokhtar

        Activity

        Hide
        mmokhtar Mostafa Mokhtar added a comment -

        Tim Armstrong

        Usually runtime_filter_arrival_wait_time_ms is set to 1 second or higher.
        Can the filter get disabled before runtime_filter_arrival_wait_time_ms expires?

        Show
        mmokhtar Mostafa Mokhtar added a comment - Tim Armstrong Usually runtime_filter_arrival_wait_time_ms is set to 1 second or higher. Can the filter get disabled before runtime_filter_arrival_wait_time_ms expires?
        Hide
        tarmstrong Tim Armstrong added a comment -

        The problem is that if runtime_filter_arrival_wait_time_ms expires, we still want to do row-level filtering once the filter eventually arrives

        The scenario is:

        • runtime_filter_arrival_wait_time_ms expires
        • 16383 rows are processed. The filter has no effect because it hasn't arrived.
        • The filter is disabled based on the bogus stats
        • The filter arrives, but is never used because it was disabled
        Show
        tarmstrong Tim Armstrong added a comment - The problem is that if runtime_filter_arrival_wait_time_ms expires, we still want to do row-level filtering once the filter eventually arrives The scenario is: runtime_filter_arrival_wait_time_ms expires 16383 rows are processed. The filter has no effect because it hasn't arrived. The filter is disabled based on the bogus stats The filter arrives, but is never used because it was disabled
        Hide
        mmokhtar Mostafa Mokhtar added a comment -

        Makes sense, I think Q59 is hitting the same issue you explained here TPC-DS-Q59.txt

        Show
        mmokhtar Mostafa Mokhtar added a comment - Makes sense, I think Q59 is hitting the same issue you explained here TPC-DS-Q59.txt
        Hide
        tarmstrong Tim Armstrong added a comment -

        Yeah it looks like it's happening with filters 0, 2, 3, and 4 at least.

        The filter table:

        
         ID  Src. Node  Tgt. Node(s)  Targets     Target type  Partition filter  Pending (Expected)  First arrived  Completed   Enabled
        -------------------------------------------------------------------------------------------------------------------------------
          7         10             8       19           LOCAL              true              0 (19)            N/A        N/A      true
          6         14             9        1          REMOTE             false               0 (3)      928.714ms  928.716ms     false
          5         15             8       19          REMOTE             false               0 (3)      928.949ms  928.950ms     false
          4          2             0       19           LOCAL              true              0 (19)            N/A        N/A      true
          3          6             1        1          REMOTE             false               0 (3)        2s818ms    2s818ms     false
          2          7             0       19          REMOTE             false               0 (3)        2s804ms    2s804ms     false
          1         16             4        1          REMOTE             false              0 (19)       16s021ms   16s049ms     false
          0         16          1, 5     1, 1  REMOTE, REMOTE      false, false              0 (19)       16s020ms   16s226ms     false
        
              HDFS_SCAN_NODE (id=1):(Total: 2s494ms, non-child: 2s494ms, % non-child: 100.00%)
                 - AverageHdfsReadThreadConcurrency: 0.00 
                 - AverageScannerThreadConcurrency: 1.00 
                 - BytesRead: 586.52 KB (600601)
                 - BytesReadDataNodeCache: 0
                 - BytesReadLocal: 586.52 KB (600601)
                 - BytesReadRemoteUnexpected: 0
                 - BytesReadShortCircuit: 586.52 KB (600601)
                 - DecompressionTime: 318.440us
                 - MaxCompressedTextFileLength: 0
                 - NumColumns: 3 (3)
                 - NumDisksAccessed: 1 (1)
                 - NumRowGroups: 1 (1)
                 - NumScannerThreadsStarted: 1 (1)
                 - PeakMemoryUsage: 3.44 MB (3608196)
                 - PerReadThreadRawHdfsThroughput: 944.29 MB/sec
                 - RemoteScanRanges: 0 (0)
                 - RowBatchQueueGetWaitTime: 4.996ms
                 - RowBatchQueuePutWaitTime: 0.000ns
                 - RowsRead: 73.05K (73049)
                 - RowsReturned: 73.05K (73049)
                 - RowsReturnedRate: 29.28 K/sec
                 - ScanRangesComplete: 1 (1)
                 - ScannerThreadsInvoluntaryContextSwitches: 0 (0)
                 - ScannerThreadsTotalWallClockTime: 2s338ms
                   - MaterializeTupleTime(*): 4.461ms
                   - ScannerThreadsSysTime: 0.000ns
                   - ScannerThreadsUserTime: 5.999ms
                 - ScannerThreadsVoluntaryContextSwitches: 125 (125)
                 - TotalRawHdfsReadTime(*): 606.568us
                 - TotalReadThroughput: 0.00 /sec
                Filter 0 (1.00 MB):
                   - Rows processed: 16.38K (16383)
                   - Rows rejected: 0 (0)
                   - Rows total: 16.38K (16384)
                Filter 3 (1.00 MB):
                   - Rows processed: 16.38K (16383)
                   - Rows rejected: 0 (0)
                   - Rows total: 16.38K (16384)
        
        
                  Filter 2 (1.00 MB):
                     - Rows processed: 2.11M (2113407)
                     - Rows rejected: 49.86K (49864)
                     - Rows total: 2.11M (2113536)
                  Filter 4 (1.00 MB):
                     - Files processed: 132 (132)
                     - Files rejected: 3 (3)
                     - Files total: 132 (132)
                     - RowGroups processed: 426.66K (426662)
                     - RowGroups rejected: 0 (0)
                     - RowGroups total: 426.66K (426662)
                     - Rows processed: 2.11M (2113407)
                     - Rows rejected: 0 (0)
                     - Rows total: 2.11M (2113536)
                     - Splits processed: 129 (129)
                     - Splits rejected: 0 (0)
                     - Splits total: 129 (129)
        
        Show
        tarmstrong Tim Armstrong added a comment - Yeah it looks like it's happening with filters 0, 2, 3, and 4 at least. The filter table: ID Src. Node Tgt. Node(s) Targets Target type Partition filter Pending (Expected) First arrived Completed Enabled ------------------------------------------------------------------------------------------------------------------------------- 7 10 8 19 LOCAL true 0 (19) N/A N/A true 6 14 9 1 REMOTE false 0 (3) 928.714ms 928.716ms false 5 15 8 19 REMOTE false 0 (3) 928.949ms 928.950ms false 4 2 0 19 LOCAL true 0 (19) N/A N/A true 3 6 1 1 REMOTE false 0 (3) 2s818ms 2s818ms false 2 7 0 19 REMOTE false 0 (3) 2s804ms 2s804ms false 1 16 4 1 REMOTE false 0 (19) 16s021ms 16s049ms false 0 16 1, 5 1, 1 REMOTE, REMOTE false , false 0 (19) 16s020ms 16s226ms false HDFS_SCAN_NODE (id=1):(Total: 2s494ms, non-child: 2s494ms, % non-child: 100.00%) - AverageHdfsReadThreadConcurrency: 0.00 - AverageScannerThreadConcurrency: 1.00 - BytesRead: 586.52 KB (600601) - BytesReadDataNodeCache: 0 - BytesReadLocal: 586.52 KB (600601) - BytesReadRemoteUnexpected: 0 - BytesReadShortCircuit: 586.52 KB (600601) - DecompressionTime: 318.440us - MaxCompressedTextFileLength: 0 - NumColumns: 3 (3) - NumDisksAccessed: 1 (1) - NumRowGroups: 1 (1) - NumScannerThreadsStarted: 1 (1) - PeakMemoryUsage: 3.44 MB (3608196) - PerReadThreadRawHdfsThroughput: 944.29 MB/sec - RemoteScanRanges: 0 (0) - RowBatchQueueGetWaitTime: 4.996ms - RowBatchQueuePutWaitTime: 0.000ns - RowsRead: 73.05K (73049) - RowsReturned: 73.05K (73049) - RowsReturnedRate: 29.28 K/sec - ScanRangesComplete: 1 (1) - ScannerThreadsInvoluntaryContextSwitches: 0 (0) - ScannerThreadsTotalWallClockTime: 2s338ms - MaterializeTupleTime(*): 4.461ms - ScannerThreadsSysTime: 0.000ns - ScannerThreadsUserTime: 5.999ms - ScannerThreadsVoluntaryContextSwitches: 125 (125) - TotalRawHdfsReadTime(*): 606.568us - TotalReadThroughput: 0.00 /sec Filter 0 (1.00 MB): - Rows processed: 16.38K (16383) - Rows rejected: 0 (0) - Rows total: 16.38K (16384) Filter 3 (1.00 MB): - Rows processed: 16.38K (16383) - Rows rejected: 0 (0) - Rows total: 16.38K (16384) Filter 2 (1.00 MB): - Rows processed: 2.11M (2113407) - Rows rejected: 49.86K (49864) - Rows total: 2.11M (2113536) Filter 4 (1.00 MB): - Files processed: 132 (132) - Files rejected: 3 (3) - Files total: 132 (132) - RowGroups processed: 426.66K (426662) - RowGroups rejected: 0 (0) - RowGroups total: 426.66K (426662) - Rows processed: 2.11M (2113407) - Rows rejected: 0 (0) - Rows total: 2.11M (2113536) - Splits processed: 129 (129) - Splits rejected: 0 (0) - Splits total: 129 (129)
        Hide
        kwho Michael Ho added a comment -

        https://github.com/apache/incubator-impala/commit/1e306211d0eb511ff3e4f3f6f91d7e6fcd01af15

        IMPALA-3838, IMPALA-4495: Codegen EvalRuntimeFilters() and fixes filter stats updates

        This change codegens HdfsParquetScanner::EvalRuntimeFilters()
        by unrolling its loop, codegen'ing the expression evaluation
        of the runtime filter and replacing some type information with
        constants in the hashing function of runtime filter to avoid
        branching at runtime.

        This change also fixes IMPALA-4495 by not counting a row as
        'considered' in the filter stats before the filter arrives.
        This avoids unnecessarily marking a runtime filter as
        ineffective before it's even used.

        With this change, TPCDS-Q88 improves by 13-14%.
        primitive_broadcast_join_1 improves by 24%.

        Change-Id: I27114869840e268d17e91d6e587ef811628e3837
        Reviewed-on: http://gerrit.cloudera.org:8080/4833
        Reviewed-by: Michael Ho <kwho@cloudera.com>
        Tested-by: Internal Jenkins

        Show
        kwho Michael Ho added a comment - https://github.com/apache/incubator-impala/commit/1e306211d0eb511ff3e4f3f6f91d7e6fcd01af15 IMPALA-3838 , IMPALA-4495 : Codegen EvalRuntimeFilters() and fixes filter stats updates This change codegens HdfsParquetScanner::EvalRuntimeFilters() by unrolling its loop, codegen'ing the expression evaluation of the runtime filter and replacing some type information with constants in the hashing function of runtime filter to avoid branching at runtime. This change also fixes IMPALA-4495 by not counting a row as 'considered' in the filter stats before the filter arrives. This avoids unnecessarily marking a runtime filter as ineffective before it's even used. With this change, TPCDS-Q88 improves by 13-14%. primitive_broadcast_join_1 improves by 24%. Change-Id: I27114869840e268d17e91d6e587ef811628e3837 Reviewed-on: http://gerrit.cloudera.org:8080/4833 Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Internal Jenkins
        Hide
        jrussell John Russell added a comment -

        I removed the specific performance numbers because those are usually not included in docs.

        I already had some wording about this JIRA, I think it was accurate so I left it as is. You can verify here (the item mentions the IMPALA-4495 JIRA number):

        https://gerrit.cloudera.org/#/c/5668/4/docs/topics/impala_new_features.xml

        Show
        jrussell John Russell added a comment - I removed the specific performance numbers because those are usually not included in docs. I already had some wording about this JIRA, I think it was accurate so I left it as is. You can verify here (the item mentions the IMPALA-4495 JIRA number): https://gerrit.cloudera.org/#/c/5668/4/docs/topics/impala_new_features.xml

          People

          • Assignee:
            kwho Michael Ho
            Reporter:
            tarmstrong Tim Armstrong
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development