Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-5388

wrong results under stress with secure cluster

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: Impala 2.3.0, Impala 2.5.0, Impala 2.4.0, Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0
    • Fix Version/s: Impala 2.9.0
    • Component/s: Distributed Exec

      Description

      The stress test under a secure (Kerberos + SSL) cluster finds that some queries are producing wrong results. I haven't yet been able to pin down why, but I'm going ahead and filing this bug to include what I have. Note that during the run, the queries do not always produce wrong results; only sometimes.

      Queries the stress test has reported as producing wrong results:

      tpch-q3, tpcds-q34, tpch-q12, tpch-q7

      In the case of tpch-q3, I managed to get a complete profile of a correct and incorrect run of the query. See attached.

      TPCH-Q3 is

      select
        l_orderkey,
        sum(l_extendedprice * (1 - l_discount)) as revenue,
        o_orderdate,
        o_shippriority
      from
        customer,
        orders,
        lineitem
      where
        c_mktsegment = 'BUILDING'
        and c_custkey = o_custkey
        and l_orderkey = o_orderkey
        and o_orderdate < '1995-03-15'
        and l_shipdate > '1995-03-15'
      group by
        l_orderkey,
        o_orderdate,
        o_shippriority
      order by
        revenue desc,
        o_orderdate
      limit 10
      

      I got as far as noticing that in the "wrong results" case, fewer rows are scanned:

      Results correct

      Operator              #Hosts   Avg Time   Max Time    #Rows  Est. #Rows   Peak Mem  Est. Peak Mem  Detail
      ----------------------------------------------------------------------------------------------------------------------------------
      13:MERGING-EXCHANGE        1  308.431us  308.431us       10          10          0              0  UNPARTITIONED
      06:TOP-N                   8   15.787ms   18.612ms       80          10   12.00 KB       580.00 B
      12:AGGREGATE               8  506.351ms  692.593ms    1.13M       1.73M   14.24 MB      105.17 MB  FINALIZE
      11:EXCHANGE                8   75.520ms  138.635ms    1.13M       1.73M          0              0  HASH(l_orderkey,o_orderdate...
      05:AGGREGATE               8  389.129ms  650.835ms    1.13M       1.73M   13.70 MB      105.17 MB  STREAMING
      04:HASH JOIN               8    1s901ms    2s576ms    2.99M       1.73M  153.17 MB       12.98 MB  INNER JOIN, PARTITIONED
      |--10:EXCHANGE             8  235.256ms  552.595ms    3.00M       3.00M          0              0  HASH(c_custkey)
      |  00:SCAN HDFS            5  323.828ms  621.551ms    3.00M       3.00M   29.30 MB      176.00 MB  tpch_100_parquet.customer
      09:EXCHANGE                8  728.297ms  758.348ms   14.57M       6.00M          0              0  HASH(o_custkey)
      03:HASH JOIN               8   24s679ms   29s349ms   14.57M       6.00M  777.11 MB       98.35 MB  INNER JOIN, PARTITIONED
      |--08:EXCHANGE             8    5s521ms    8s310ms   70.97M      15.00M          0              0  HASH(o_orderkey)
      |  01:SCAN HDFS            8    3s626ms    7s399ms   70.97M      15.00M   80.88 MB      352.00 MB  tpch_100_parquet.orders
      07:EXCHANGE                8   14s268ms   15s285ms  323.49M      60.00M          0              0  HASH(l_orderkey)
      02:SCAN HDFS               8   11s632ms   17s863ms  323.49M      60.00M   78.65 MB      352.00 MB  tpch_100_parquet.lineitem
      

      Results incorrect:

          ExecSummary:
      Operator              #Hosts   Avg Time   Max Time    #Rows  Est. #Rows   Peak Mem  Est. Peak Mem  Detail
      ----------------------------------------------------------------------------------------------------------------------------------
      13:MERGING-EXCHANGE        1  304.504us  304.504us       10          10          0              0  UNPARTITIONED
      06:TOP-N                   8   19.261ms   29.196ms       80          10   12.00 KB       580.00 B
      12:AGGREGATE               8  305.220ms  449.997ms    1.13M       1.73M   14.24 MB      105.17 MB  FINALIZE
      11:EXCHANGE                8   66.207ms   96.284ms    1.13M       1.73M          0              0  HASH(l_orderkey,o_orderdate...
      05:AGGREGATE               8  516.324ms  653.086ms    1.13M       1.73M   13.58 MB      105.17 MB  STREAMING
      04:HASH JOIN               8    1s217ms    1s461ms    2.99M       1.73M  153.17 MB       12.98 MB  INNER JOIN, PARTITIONED
      |--10:EXCHANGE             8  150.899ms  213.929ms    3.00M       3.00M          0              0  HASH(c_custkey)
      |  00:SCAN HDFS            5  937.452ms    1s753ms    3.00M       3.00M   29.09 MB      176.00 MB  tpch_100_parquet.customer
      09:EXCHANGE                8  563.317ms  581.895ms   11.04M       6.00M          0              0  HASH(o_custkey)
      03:HASH JOIN               8   24s420ms   28s126ms   11.04M       6.00M  649.11 MB       98.35 MB  INNER JOIN, PARTITIONED
      |--08:EXCHANGE             8    2s733ms    2s967ms   53.80M      15.00M          0              0  HASH(o_orderkey)
      |  01:SCAN HDFS            8   30s937ms   47s728ms   53.80M      15.00M   85.11 MB      352.00 MB  tpch_100_parquet.orders
      07:EXCHANGE                8   13s816ms   14s173ms  323.49M      60.00M          0              0  HASH(l_orderkey)
      02:SCAN HDFS               8   10s053ms   12s288ms  323.48M      60.00M   78.57 MB      352.00 MB  tpch_100_parquet.lineitem
      

        Issue Links

          Activity

          Hide
          mikesbrown Michael Brown added a comment - - edited

          For the query above, the wrong results returned in this case were:

          146180672 554130.5956 1995-02-22 0
          249739810 492136.8537 1995-02-28 0
          165214338 485645.5293 1995-03-09 0
          507274210 485256.2467 1995-03-10 0
          202071367 474746.4068 1995-03-12 0
          37307463 474325.1938 1995-02-06 0
          461984355 468002.5274 1995-03-13 0
          100810368 467932.7645 1995-02-28 0
          74015523 467786.8027 1995-03-14 0
          99950372 464215.9136 1995-02-17 0
          

          The correct results should be:

          249739810 492136.8537 1995-02-28 0
          165214338 485645.5293 1995-03-09 0
          507274210 485256.2467 1995-03-10 0
          202071367 474746.4068 1995-03-12 0
          37307463 474325.1938 1995-02-06 0
          461984355 468002.5274 1995-03-13 0
          100810368 467932.7645 1995-02-28 0
          74015523 467786.8027 1995-03-14 0
          402772582 463367.0665 1995-03-10 0
          367111712 462559.3977 1995-03-01 0
          

          Note that some of these rows match, but it seems like this row 146180672 554130.5956 1995-02-22 0 is wrong. It seems like that row is supposed to be:

          146180672 277065.2978 1995-02-22 0
          

          making its corresponding SUM() wrong in the wrong-results example. In the correct case, this row shouldn't even appear because of the LIMIT clause.

          Show
          mikesbrown Michael Brown added a comment - - edited For the query above, the wrong results returned in this case were: 146180672 554130.5956 1995-02-22 0 249739810 492136.8537 1995-02-28 0 165214338 485645.5293 1995-03-09 0 507274210 485256.2467 1995-03-10 0 202071367 474746.4068 1995-03-12 0 37307463 474325.1938 1995-02-06 0 461984355 468002.5274 1995-03-13 0 100810368 467932.7645 1995-02-28 0 74015523 467786.8027 1995-03-14 0 99950372 464215.9136 1995-02-17 0 The correct results should be: 249739810 492136.8537 1995-02-28 0 165214338 485645.5293 1995-03-09 0 507274210 485256.2467 1995-03-10 0 202071367 474746.4068 1995-03-12 0 37307463 474325.1938 1995-02-06 0 461984355 468002.5274 1995-03-13 0 100810368 467932.7645 1995-02-28 0 74015523 467786.8027 1995-03-14 0 402772582 463367.0665 1995-03-10 0 367111712 462559.3977 1995-03-01 0 Note that some of these rows match, but it seems like this row 146180672 554130.5956 1995-02-22 0 is wrong. It seems like that row is supposed to be: 146180672 277065.2978 1995-02-22 0 making its corresponding SUM() wrong in the wrong-results example. In the correct case, this row shouldn't even appear because of the LIMIT clause.
          Hide
          dhecht Dan Hecht added a comment -

          Interestingly, the wrong result of 554130.5956 is 2 * 277065.2978.

          Show
          dhecht Dan Hecht added a comment - Interestingly, the wrong result of 554130.5956 is 2 * 277065.2978 .
          Hide
          mikesbrown Michael Brown added a comment -

          I haven't been able to look at all the wrong results yet, but of the 4 queries that have problems, all 4 contain at least 1 aggregation (count or sum), a group by, and an order by.

          Show
          mikesbrown Michael Brown added a comment - I haven't been able to look at all the wrong results yet, but of the 4 queries that have problems, all 4 contain at least 1 aggregation (count or sum), a group by, and an order by.
          Hide
          dhecht Dan Hecht added a comment -

          Michael Brown, given that the scan is producing a different number of results and row-level runtime filtering is kicking in, it might be helpful to turn off runtime filters (query option RUNTIME_FILTER_MODE=0) and see if it still reproduces (and capture the profiles with it disabled if possible).

          Show
          dhecht Dan Hecht added a comment - Michael Brown , given that the scan is producing a different number of results and row-level runtime filtering is kicking in, it might be helpful to turn off runtime filters (query option RUNTIME_FILTER_MODE=0) and see if it still reproduces (and capture the profiles with it disabled if possible).
          Hide
          dhecht Dan Hecht added a comment -

          Michael Ho would you be able to help with this one?

          Show
          dhecht Dan Hecht added a comment - Michael Ho would you be able to help with this one?
          Hide
          mikesbrown Michael Brown added a comment -

          I reenabled the test to set RUNTIME_FILTER_MODE=0. For this run, I was not able to produce wrong results out of TPCH-Q3, but I was able to produce wrong results from TPCDS-Q68 (not mentioned before in this bug) and TPCDS-Q34 (seen before). I'm going to attach the profiles for TPCDS-Q68. Note: I don't see scan row count differences, but I see a few differences in some of the operators (22, 12, 24, 13). In the TPCDS-68 case, this is how the results (also attached) differ:

          --- correct-tpcds-q68-d94aa36d3a2b7f12_72bd82a400000000_results.csv     2017-05-31 12:40:13.143647242 -0700
          +++ wrong-tpcds-q68-b4afd338536090b_47bd038400000000_results.csv        2017-05-31 12:42:46.295596997 -0700
          @@ -33,6 +33,7 @@
           ,Stephen,Friendship,Wilson,2538085,53963.49,1204.16,93236.30
           ,,,Newtown,2584610,15107.34,395.01,29855.71
           ,Terry,Deerfield,Harmony,2610027,26326.35,1274.91,44817.29
          +,Terry,Deerfield,Harmony,2610027,26326.35,1274.91,44817.29
           ,Justin,Brookville,Bethel,2638149,18262.28,901.17,50390.31
           ,Michael,White Oak,,2681333,11019.24,855.89,26544.04
           ,Shana,Edgewood,Kingston,2699001,25664.02,1279.39,59291.62
          @@ -96,4 +97,3 @@
           ,Robert,Arlington,Pleasant Hill,7600575,17992.51,911.03,50704.63
           ,Dewey,Oakdale,Bridgeport,7687376,18096.49,899.90,28831.03
           ,,Five Forks,Oakwood,7734977,23336.59,876.92,60526.08
          -,,Springfield,Sulphur Springs,7858247,23309.50,1173.47,58698.90
          

          note the duplicate row and absence of a row.

          This is TPCDS-Q68:

          select
            c_last_name,
            c_first_name,
            ca_city,
            bought_city,
            ss_ticket_number,
            extended_price,
            extended_tax,
            list_price
          from
            (select
              ss_ticket_number,
              ss_customer_sk,
              ca_city bought_city,
              sum(ss_ext_sales_price) extended_price,
              sum(ss_ext_list_price) list_price,
              sum(ss_ext_tax) extended_tax
            from
              store_sales,
              date_dim,
              store,
              household_demographics,
              customer_address
            where
              store_sales.ss_sold_date_sk = date_dim.d_date_sk
              and store_sales.ss_store_sk = store.s_store_sk
              and store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
              and store_sales.ss_addr_sk = customer_address.ca_address_sk
              and (household_demographics.hd_dep_count = 5
                or household_demographics.hd_vehicle_count = 3)
              and store.s_city in ('Midway', 'Fairview')
              and ss_sold_date_sk between 2451180 and 2451269
              and d_date between '1999-01-01' and '1999-03-31'
            group by
              ss_ticket_number,
              ss_customer_sk,
              ss_addr_sk,
              ca_city
            ) dn,
            customer,
            customer_address current_addr
          where
            ss_customer_sk = c_customer_sk
            and customer.c_current_addr_sk = current_addr.ca_address_sk
            and current_addr.ca_city <> bought_city
          order by
            c_last_name,
            ss_ticket_number
          limit 100;
          

          In this case, the extra row is wrong, and the missing row is truncated due to LIMIT and the extra, duplicate row's presence.

          From the profiles:

          OK results

          Operator                #Hosts   Avg Time   Max Time    #Rows  Est. #Rows   Peak Mem  Est. Peak Mem  Detail
          ------------------------------------------------------------------------------------------------------------------------------------
          25:MERGING-EXCHANGE          1  893.145us  893.145us      100         100          0              0  UNPARTITIONED
          14:TOP-N                     7  194.693ms  434.773ms      700         100    2.64 MB       14.54 KB
          13:HASH JOIN                 7  543.257ms  807.976ms   27.78K       4.64K   11.37 MB      779.23 KB  INNER JOIN, BROADCAST
          |--24:EXCHANGE               7   25.744ms  101.806ms   28.00K       4.64K          0              0  BROADCAST
          |  12:HASH JOIN              6  368.666ms  424.172ms   28.00K       4.64K    2.06 MB       10.17 MB  INNER JOIN, PARTITIONED
          |  |--23:EXCHANGE            6    9.773ms   33.039ms   28.94K     802.16K          0              0  HASH(ss_customer_sk)
          |  |  21:AGGREGATE           8  364.394ms  890.913ms   28.94K     802.16K    2.67 MB       81.36 MB  FINALIZE
          |  |  20:EXCHANGE            8    5.247ms   28.668ms   28.94K     802.16K          0              0  HASH(ss_ticket_number,ss_cu...
          |  |  09:AGGREGATE           8  365.471ms    1s020ms   28.94K     802.16K    1.88 MB       81.36 MB  STREAMING
          |  |  08:HASH JOIN           8    1s705ms    2s080ms  307.58K     802.16K  145.22 MB       10.71 MB  INNER JOIN, PARTITIONED
          |  |  |--19:EXCHANGE         8  899.617ms    1s252ms    2.50M       2.50M          0              0  HASH(customer_address.ca_ad...
          |  |  |  04:SCAN HDFS        7    3s904ms   13s096ms    2.50M       2.50M    8.12 MB       32.00 MB  tpcds_300_decimal_parquet.c...
          |  |  18:EXCHANGE            8   55.988ms  162.593ms  309.42K     802.16K          0              0  HASH(store_sales.ss_addr_sk)
          |  |  07:HASH JOIN           8  173.736ms  516.461ms  309.42K     802.16K    8.60 MB      266.80 KB  INNER JOIN, BROADCAST
          |  |  |--17:EXCHANGE         8   47.999us   53.952us       90       7.30K          0              0  BROADCAST
          |  |  |  01:SCAN HDFS        1   34s010ms   34s010ms       90       7.30K    4.22 MB       32.00 MB  tpcds_300_decimal_parquet.d...
          |  |  06:HASH JOIN           8  199.490ms  443.448ms  309.42K     802.16K    4.42 MB       30.94 KB  INNER JOIN, BROADCAST
          |  |  |--16:EXCHANGE         8    1.726ms   10.860ms    1.80K       1.80K          0              0  BROADCAST
          |  |  |  03:SCAN HDFS        1   10s369ms   10s369ms    1.80K       1.80K  379.21 KB       48.00 MB  tpcds_300_decimal_parquet.h...
          |  |  05:HASH JOIN           8  392.606ms  470.851ms    1.26M       3.21M    2.96 MB        1.84 KB  INNER JOIN, BROADCAST
          |  |  |--15:EXCHANGE         8   29.577us   31.445us       49          52          0              0  BROADCAST
          |  |  |  02:SCAN HDFS        1   32s131ms   32s131ms       49          52  148.03 KB       32.00 MB  tpcds_300_decimal_parquet.s...
          |  |  00:SCAN HDFS           8       1m7s      1m48s   24.81M      24.81M   30.91 MB      192.00 MB  tpcds_300_decimal_parquet.s...
          |  22:EXCHANGE               6  736.759ms  886.591ms    5.00M       5.00M          0              0  HASH(c_customer_sk)
          |  10:SCAN HDFS              6    6s403ms   20s219ms    5.00M       5.00M   21.79 MB      192.00 MB  tpcds_300_decimal_parquet.c...
          11:SCAN HDFS                 7    3s022ms    6s232ms    2.50M       2.50M    6.08 MB       32.00 MB  tpcds_300_decimal_parquet.c...
          

          Wrong results

          Operator                #Hosts   Avg Time   Max Time    #Rows  Est. #Rows   Peak Mem  Est. Peak Mem  Detail
          ------------------------------------------------------------------------------------------------------------------------------------
          25:MERGING-EXCHANGE          1    1.267ms    1.267ms      100         100          0              0  UNPARTITIONED
          14:TOP-N                     7  276.408ms  516.676ms      700         100    2.62 MB       14.54 KB
          13:HASH JOIN                 7  727.081ms    1s544ms   27.83K       4.64K   11.37 MB      779.23 KB  INNER JOIN, BROADCAST
          |--24:EXCHANGE               7   42.220ms   78.747ms   28.05K       4.64K          0              0  BROADCAST
          |  12:HASH JOIN              6  371.353ms  765.500ms   28.05K       4.64K    2.06 MB       10.17 MB  INNER JOIN, PARTITIONED
          |  |--23:EXCHANGE            6   14.994ms   70.869ms   28.94K     802.16K          0              0  HASH(ss_customer_sk)
          |  |  21:AGGREGATE           8  571.141ms    2s047ms   28.94K     802.16K    2.67 MB       81.36 MB  FINALIZE
          |  |  20:EXCHANGE            8    2.351ms    8.942ms   28.94K     802.16K          0              0  HASH(ss_ticket_number,ss_cu...
          |  |  09:AGGREGATE           8  445.488ms    1s488ms   28.94K     802.16K    1.89 MB       81.36 MB  STREAMING
          |  |  08:HASH JOIN           8    2s503ms    4s987ms  307.58K     802.16K  145.22 MB       10.71 MB  INNER JOIN, PARTITIONED
          |  |  |--19:EXCHANGE         8    1s584ms    3s210ms    2.50M       2.50M          0              0  HASH(customer_address.ca_ad...
          |  |  |  04:SCAN HDFS        7    3s397ms   20s090ms    2.50M       2.50M    8.12 MB       32.00 MB  tpcds_300_decimal_parquet.c...
          |  |  18:EXCHANGE            8  450.027ms    2s398ms  309.42K     802.16K          0              0  HASH(store_sales.ss_addr_sk)
          |  |  07:HASH JOIN           8  190.655ms  896.168ms  309.42K     802.16K    8.33 MB      266.80 KB  INNER JOIN, BROADCAST
          |  |  |--17:EXCHANGE         8    2.694ms   21.113ms       90       7.30K          0              0  BROADCAST
          |  |  |  01:SCAN HDFS        1      2m36s      2m36s       90       7.30K    4.22 MB       32.00 MB  tpcds_300_decimal_parquet.d...
          |  |  06:HASH JOIN           8  120.423ms  340.642ms  309.42K     802.16K    4.36 MB       30.94 KB  INNER JOIN, BROADCAST
          |  |  |--16:EXCHANGE         8  697.420us    2.455ms    1.80K       1.80K          0              0  BROADCAST
          |  |  |  03:SCAN HDFS        1    3s134ms    3s134ms    1.80K       1.80K  379.21 KB       48.00 MB  tpcds_300_decimal_parquet.h...
          |  |  05:HASH JOIN           8  377.919ms  480.362ms    1.26M       3.21M    2.96 MB        1.84 KB  INNER JOIN, BROADCAST
          |  |  |--15:EXCHANGE         8   43.272us   59.414us       49          52          0              0  BROADCAST
          |  |  |  02:SCAN HDFS        1      2m14s      2m14s       49          52  148.03 KB       32.00 MB  tpcds_300_decimal_parquet.s...
          |  |  00:SCAN HDFS           8   34s359ms      2m29s   24.81M      24.81M   40.21 MB      192.00 MB  tpcds_300_decimal_parquet.s...
          |  22:EXCHANGE               6  860.720ms    1s281ms    5.01M       5.00M          0              0  HASH(c_customer_sk)
          |  10:SCAN HDFS              6    7s881ms   41s393ms    5.00M       5.00M   22.76 MB      192.00 MB  tpcds_300_decimal_parquet.c...
          11:SCAN HDFS                 7    3s153ms    5s670ms    2.50M       2.50M    3.95 MB       32.00 MB  tpcds_300_decimal_parquet.c...
          

          It's possible the TPCDS-Q68 issue is a different bug; let me know if you want a different bug filed. In any case, I'll try to keep reproducing the Q3 problem, and try to look at Q34 as well.

          Show
          mikesbrown Michael Brown added a comment - I reenabled the test to set RUNTIME_FILTER_MODE=0 . For this run, I was not able to produce wrong results out of TPCH-Q3, but I was able to produce wrong results from TPCDS-Q68 (not mentioned before in this bug) and TPCDS-Q34 (seen before). I'm going to attach the profiles for TPCDS-Q68. Note: I don't see scan row count differences, but I see a few differences in some of the operators (22, 12, 24, 13). In the TPCDS-68 case, this is how the results (also attached) differ: --- correct-tpcds-q68-d94aa36d3a2b7f12_72bd82a400000000_results.csv 2017-05-31 12:40:13.143647242 -0700 +++ wrong-tpcds-q68-b4afd338536090b_47bd038400000000_results.csv 2017-05-31 12:42:46.295596997 -0700 @@ -33,6 +33,7 @@ ,Stephen,Friendship,Wilson,2538085,53963.49,1204.16,93236.30 ,,,Newtown,2584610,15107.34,395.01,29855.71 ,Terry,Deerfield,Harmony,2610027,26326.35,1274.91,44817.29 +,Terry,Deerfield,Harmony,2610027,26326.35,1274.91,44817.29 ,Justin,Brookville,Bethel,2638149,18262.28,901.17,50390.31 ,Michael,White Oak,,2681333,11019.24,855.89,26544.04 ,Shana,Edgewood,Kingston,2699001,25664.02,1279.39,59291.62 @@ -96,4 +97,3 @@ ,Robert,Arlington,Pleasant Hill,7600575,17992.51,911.03,50704.63 ,Dewey,Oakdale,Bridgeport,7687376,18096.49,899.90,28831.03 ,,Five Forks,Oakwood,7734977,23336.59,876.92,60526.08 -,,Springfield,Sulphur Springs,7858247,23309.50,1173.47,58698.90 note the duplicate row and absence of a row. This is TPCDS-Q68: select c_last_name, c_first_name, ca_city, bought_city, ss_ticket_number, extended_price, extended_tax, list_price from (select ss_ticket_number, ss_customer_sk, ca_city bought_city, sum(ss_ext_sales_price) extended_price, sum(ss_ext_list_price) list_price, sum(ss_ext_tax) extended_tax from store_sales, date_dim, store, household_demographics, customer_address where store_sales.ss_sold_date_sk = date_dim.d_date_sk and store_sales.ss_store_sk = store.s_store_sk and store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk and store_sales.ss_addr_sk = customer_address.ca_address_sk and (household_demographics.hd_dep_count = 5 or household_demographics.hd_vehicle_count = 3) and store.s_city in ('Midway', 'Fairview') and ss_sold_date_sk between 2451180 and 2451269 and d_date between '1999-01-01' and '1999-03-31' group by ss_ticket_number, ss_customer_sk, ss_addr_sk, ca_city ) dn, customer, customer_address current_addr where ss_customer_sk = c_customer_sk and customer.c_current_addr_sk = current_addr.ca_address_sk and current_addr.ca_city <> bought_city order by c_last_name, ss_ticket_number limit 100; In this case, the extra row is wrong, and the missing row is truncated due to LIMIT and the extra, duplicate row's presence. From the profiles: OK results Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail ------------------------------------------------------------------------------------------------------------------------------------ 25:MERGING-EXCHANGE 1 893.145us 893.145us 100 100 0 0 UNPARTITIONED 14:TOP-N 7 194.693ms 434.773ms 700 100 2.64 MB 14.54 KB 13:HASH JOIN 7 543.257ms 807.976ms 27.78K 4.64K 11.37 MB 779.23 KB INNER JOIN, BROADCAST |--24:EXCHANGE 7 25.744ms 101.806ms 28.00K 4.64K 0 0 BROADCAST | 12:HASH JOIN 6 368.666ms 424.172ms 28.00K 4.64K 2.06 MB 10.17 MB INNER JOIN, PARTITIONED | |--23:EXCHANGE 6 9.773ms 33.039ms 28.94K 802.16K 0 0 HASH(ss_customer_sk) | | 21:AGGREGATE 8 364.394ms 890.913ms 28.94K 802.16K 2.67 MB 81.36 MB FINALIZE | | 20:EXCHANGE 8 5.247ms 28.668ms 28.94K 802.16K 0 0 HASH(ss_ticket_number,ss_cu... | | 09:AGGREGATE 8 365.471ms 1s020ms 28.94K 802.16K 1.88 MB 81.36 MB STREAMING | | 08:HASH JOIN 8 1s705ms 2s080ms 307.58K 802.16K 145.22 MB 10.71 MB INNER JOIN, PARTITIONED | | |--19:EXCHANGE 8 899.617ms 1s252ms 2.50M 2.50M 0 0 HASH(customer_address.ca_ad... | | | 04:SCAN HDFS 7 3s904ms 13s096ms 2.50M 2.50M 8.12 MB 32.00 MB tpcds_300_decimal_parquet.c... | | 18:EXCHANGE 8 55.988ms 162.593ms 309.42K 802.16K 0 0 HASH(store_sales.ss_addr_sk) | | 07:HASH JOIN 8 173.736ms 516.461ms 309.42K 802.16K 8.60 MB 266.80 KB INNER JOIN, BROADCAST | | |--17:EXCHANGE 8 47.999us 53.952us 90 7.30K 0 0 BROADCAST | | | 01:SCAN HDFS 1 34s010ms 34s010ms 90 7.30K 4.22 MB 32.00 MB tpcds_300_decimal_parquet.d... | | 06:HASH JOIN 8 199.490ms 443.448ms 309.42K 802.16K 4.42 MB 30.94 KB INNER JOIN, BROADCAST | | |--16:EXCHANGE 8 1.726ms 10.860ms 1.80K 1.80K 0 0 BROADCAST | | | 03:SCAN HDFS 1 10s369ms 10s369ms 1.80K 1.80K 379.21 KB 48.00 MB tpcds_300_decimal_parquet.h... | | 05:HASH JOIN 8 392.606ms 470.851ms 1.26M 3.21M 2.96 MB 1.84 KB INNER JOIN, BROADCAST | | |--15:EXCHANGE 8 29.577us 31.445us 49 52 0 0 BROADCAST | | | 02:SCAN HDFS 1 32s131ms 32s131ms 49 52 148.03 KB 32.00 MB tpcds_300_decimal_parquet.s... | | 00:SCAN HDFS 8 1m7s 1m48s 24.81M 24.81M 30.91 MB 192.00 MB tpcds_300_decimal_parquet.s... | 22:EXCHANGE 6 736.759ms 886.591ms 5.00M 5.00M 0 0 HASH(c_customer_sk) | 10:SCAN HDFS 6 6s403ms 20s219ms 5.00M 5.00M 21.79 MB 192.00 MB tpcds_300_decimal_parquet.c... 11:SCAN HDFS 7 3s022ms 6s232ms 2.50M 2.50M 6.08 MB 32.00 MB tpcds_300_decimal_parquet.c... Wrong results Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail ------------------------------------------------------------------------------------------------------------------------------------ 25:MERGING-EXCHANGE 1 1.267ms 1.267ms 100 100 0 0 UNPARTITIONED 14:TOP-N 7 276.408ms 516.676ms 700 100 2.62 MB 14.54 KB 13:HASH JOIN 7 727.081ms 1s544ms 27.83K 4.64K 11.37 MB 779.23 KB INNER JOIN, BROADCAST |--24:EXCHANGE 7 42.220ms 78.747ms 28.05K 4.64K 0 0 BROADCAST | 12:HASH JOIN 6 371.353ms 765.500ms 28.05K 4.64K 2.06 MB 10.17 MB INNER JOIN, PARTITIONED | |--23:EXCHANGE 6 14.994ms 70.869ms 28.94K 802.16K 0 0 HASH(ss_customer_sk) | | 21:AGGREGATE 8 571.141ms 2s047ms 28.94K 802.16K 2.67 MB 81.36 MB FINALIZE | | 20:EXCHANGE 8 2.351ms 8.942ms 28.94K 802.16K 0 0 HASH(ss_ticket_number,ss_cu... | | 09:AGGREGATE 8 445.488ms 1s488ms 28.94K 802.16K 1.89 MB 81.36 MB STREAMING | | 08:HASH JOIN 8 2s503ms 4s987ms 307.58K 802.16K 145.22 MB 10.71 MB INNER JOIN, PARTITIONED | | |--19:EXCHANGE 8 1s584ms 3s210ms 2.50M 2.50M 0 0 HASH(customer_address.ca_ad... | | | 04:SCAN HDFS 7 3s397ms 20s090ms 2.50M 2.50M 8.12 MB 32.00 MB tpcds_300_decimal_parquet.c... | | 18:EXCHANGE 8 450.027ms 2s398ms 309.42K 802.16K 0 0 HASH(store_sales.ss_addr_sk) | | 07:HASH JOIN 8 190.655ms 896.168ms 309.42K 802.16K 8.33 MB 266.80 KB INNER JOIN, BROADCAST | | |--17:EXCHANGE 8 2.694ms 21.113ms 90 7.30K 0 0 BROADCAST | | | 01:SCAN HDFS 1 2m36s 2m36s 90 7.30K 4.22 MB 32.00 MB tpcds_300_decimal_parquet.d... | | 06:HASH JOIN 8 120.423ms 340.642ms 309.42K 802.16K 4.36 MB 30.94 KB INNER JOIN, BROADCAST | | |--16:EXCHANGE 8 697.420us 2.455ms 1.80K 1.80K 0 0 BROADCAST | | | 03:SCAN HDFS 1 3s134ms 3s134ms 1.80K 1.80K 379.21 KB 48.00 MB tpcds_300_decimal_parquet.h... | | 05:HASH JOIN 8 377.919ms 480.362ms 1.26M 3.21M 2.96 MB 1.84 KB INNER JOIN, BROADCAST | | |--15:EXCHANGE 8 43.272us 59.414us 49 52 0 0 BROADCAST | | | 02:SCAN HDFS 1 2m14s 2m14s 49 52 148.03 KB 32.00 MB tpcds_300_decimal_parquet.s... | | 00:SCAN HDFS 8 34s359ms 2m29s 24.81M 24.81M 40.21 MB 192.00 MB tpcds_300_decimal_parquet.s... | 22:EXCHANGE 6 860.720ms 1s281ms 5.01M 5.00M 0 0 HASH(c_customer_sk) | 10:SCAN HDFS 6 7s881ms 41s393ms 5.00M 5.00M 22.76 MB 192.00 MB tpcds_300_decimal_parquet.c... 11:SCAN HDFS 7 3s153ms 5s670ms 2.50M 2.50M 3.95 MB 32.00 MB tpcds_300_decimal_parquet.c... It's possible the TPCDS-Q68 issue is a different bug; let me know if you want a different bug filed. In any case, I'll try to keep reproducing the Q3 problem, and try to look at Q34 as well.
          Hide
          mikesbrown Michael Brown added a comment - - edited

          As for TPCH-Q3, for whatever reason, despite running 4000 queries, the stress test never completed this query, so there are no results to compare or profiles to get. It's not possible to tell from the log if it kept getting cancelled, or was timed out, or what. I'll keep trying to get a Q3 profile since we already have one of those.

          Show
          mikesbrown Michael Brown added a comment - - edited As for TPCH-Q3, for whatever reason, despite running 4000 queries, the stress test never completed this query, so there are no results to compare or profiles to get. It's not possible to tell from the log if it kept getting cancelled, or was timed out, or what. I'll keep trying to get a Q3 profile since we already have one of those.
          Hide
          kwho Michael Ho added a comment -

          Comparing the two tpcds-q68 profiles, it appears that we first diverge at Exchange node id=22. In particular, scan node id=10 produced 833333 rows per fragment instance on average in both cases but in the case with the wrong results, it appears that the exchange node id=22 somehow returned more rows than produced by the scan node. That led to more probe rows being fed into PHJ node id=12 which caused more rows to be returned upstream in the plan.

          Correct results (Avg Fragment statistics):

                HASH_JOIN_NODE (id=12)
                  - BuildRows: 4823 
                  - BuildTime: 41017783
                  - InactiveTotalTime: 0
                  - LocalTime: 368666612
                  - NumHashTableBuildsSkipped: 0
                  - PeakMemoryUsage: 1981238
                  - ProbeRows: 833333  <<------
                  - ProbeRowsPartitioned: 0
                  - ProbeTime: 207176400
                  - RowsReturned: 4666  <<------
                  - RowsReturnedRate: 26
                  - TotalTime: 172093637772
                  Hash Join Builder (join_node_id=12)
                    - BuildRowsPartitionTime: 11434272
                    - BuildRowsPartitioned: 4823
                    - GetNewBlockTime: 5798385
                    - HashBuckets: 8192
                    - HashCollisions: 0
                    - HashTablesBuildTime: 23061446
                    - InactiveTotalTime: 0
                    - LargestPartitionPercent: 6
                    - MaxPartitionLevel: 0
                    - NumRepartitions: 0
                    - PartitionsCreated: 16
                    - PeakMemoryUsage: 1878485
                    - PinTime: 0
                    - RepartitionTime: 0
                    - SpilledPartitions: 0
                    - TotalTime: 0
                    - UnpinTime: 0
                  EXCHANGE_NODE (id=23)
                    - BytesReceived: 259360
                    - ConvertRowBatchTime: 1065572
                    - DeserializeRowBatchTimer: 40991994
                    - FirstBatchArrivalWaitTime: 149625481535
                    - InactiveTotalTime: 0
                    - PeakMemoryUsage: 0
                    - RowsReturned: 4823
                    - RowsReturnedRate: 31
                    - SendersBlockedTimer: 0
                    - SendersBlockedTotalTimer(*): 0
                    - TotalTime: 149804771602
                  EXCHANGE_NODE (id=22)
                    - BytesReceived: 31057531
                    - ConvertRowBatchTime: 323436053
                    - DeserializeRowBatchTimer: 5179099024
                    - FirstBatchArrivalWaitTime: 0
                    - InactiveTotalTime: 0
                    - PeakMemoryUsage: 0
                    - RowsReturned: 833333   <<------
                    - RowsReturnedRate: 38018
                    - SendersBlockedTimer: 135348327757
                    - SendersBlockedTotalTimer(*): 511378794857
                    - TotalTime: 21920199558
          
                HDFS_SCAN_NODE (id=10)
                  - AverageHdfsReadThreadConcurrency: 0.0
                  - AverageScannerThreadConcurrency: 0.9842496806452462
                  - BytesRead: 10648538
                  - BytesReadDataNodeCache: 0
                  - BytesReadLocal: 10648538
                  - BytesReadRemoteUnexpected: 0
                  - BytesReadShortCircuit: 10648538
                  - DecompressionTime: 143249796
                  - InactiveTotalTime: 0
                  - MaxCompressedTextFileLength: 0
                  - NumColumns: 4
                  - NumDictFilteredRowGroups: 0
                  - NumDisksAccessed: 1
                  - NumRowGroups: 1
                  - NumScannerThreadsStarted: 1
                  - NumScannersWithNoReads: 0
                  - NumStatsFilteredRowGroups: 0
                  - PeakMemoryUsage: 22151612
                  - PerReadThreadRawHdfsThroughput: 337379422
                  - RemoteScanRanges: 0
                  - RowBatchQueueGetWaitTime: 5623630595
                  - RowBatchQueuePutWaitTime: 150273849222
                  - RowsRead: 833333
                  - RowsReturned: 833333    <<------
                  - RowsReturnedRate: 345451
                  - ScanRangesComplete: 1
                  - ScannerThreadsInvoluntaryContextSwitches: 214
                  - ScannerThreadsTotalWallClockTime: 160887810881
                    - MaterializeTupleTime(*): 3756166158
                    - ScannerThreadsSysTime: 21496000
                    - ScannerThreadsUserTime: 861701833
                  - ScannerThreadsVoluntaryContextSwitches: 1123
                  - TotalRawHdfsReadTime(*): 31026223
                  - TotalReadThroughput: 64880
                  - TotalTime: 6403569677
          

          Wrong results:

                HASH_JOIN_NODE (id=12)
                  - BuildRows: 4823 
                  - BuildTime: 70158637
                  - InactiveTotalTime: 0
                  - LocalTime: 371353507
                  - NumHashTableBuildsSkipped: 0
                  - PeakMemoryUsage: 1981242
                  - ProbeRows: 834892  <<------
                  - ProbeRowsPartitioned: 0
                  - ProbeTime: 173335608
                  - RowsReturned: 4675    <<------
                  - RowsReturnedRate: 13
                  - TotalTime: 331755407776
                  Hash Join Builder (join_node_id=12)
                    - BuildRowsPartitionTime: 18249579
                    - BuildRowsPartitioned: 4823
                    - GetNewBlockTime: 19402821
                    - HashBuckets: 8192
                    - HashCollisions: 0
                    - HashTablesBuildTime: 30909237
                    - InactiveTotalTime: 0
                    - LargestPartitionPercent: 6
                    - MaxPartitionLevel: 0
                    - NumRepartitions: 0
                    - PartitionsCreated: 16
                    - PeakMemoryUsage: 1878485
                    - PinTime: 0
                    - RepartitionTime: 0
                    - SpilledPartitions: 0
                    - TotalTime: 0
                    - UnpinTime: 0
                  EXCHANGE_NODE (id=23)
                    - BytesReceived: 259364
                    - ConvertRowBatchTime: 1244326
                    - DeserializeRowBatchTimer: 92528899
                    - FirstBatchArrivalWaitTime: 311960446875
                    - InactiveTotalTime: 0
                    - PeakMemoryUsage: 0
                    - RowsReturned: 4823
                    - RowsReturnedRate: 14
                    - SendersBlockedTimer: 0
                    - SendersBlockedTotalTimer(*): 0
                    - TotalTime: 312523086174
                  EXCHANGE_NODE (id=22)
                    - BytesReceived: 30365221
                    - ConvertRowBatchTime: 227659670
                    - DeserializeRowBatchTimer: 5080531802
                    - FirstBatchArrivalWaitTime: 0
                    - InactiveTotalTime: 0
                    - PeakMemoryUsage: 0
                    - RowsReturned: 834892  <<------
                    - RowsReturnedRate: 44328
                    - SendersBlockedTimer: 300209431438
                    - SendersBlockedTotalTimer(*): 1670817490111
                    - TotalTime: 18860968094
          
                HDFS_SCAN_NODE (id=10)
                  - AverageHdfsReadThreadConcurrency: 2.5290844714213456E-4
                  - AverageScannerThreadConcurrency: 0.9910697955813615
                  - BytesRead: 10648538
                  - BytesReadDataNodeCache: 0
                  - BytesReadLocal: 10648538
                  - BytesReadRemoteUnexpected: 0
                  - BytesReadShortCircuit: 10648538
                  - DecompressionTime: 122972301
                  - InactiveTotalTime: 0
                  - MaxCompressedTextFileLength: 0
                  - NumColumns: 4
                  - NumDictFilteredRowGroups: 0
                  - NumDisksAccessed: 1
                  - NumRowGroups: 1
                  - NumScannerThreadsStarted: 1
                  - NumScannersWithNoReads: 0
                  - NumStatsFilteredRowGroups: 0
                  - PeakMemoryUsage: 22320735
                  - PerReadThreadRawHdfsThroughput: 613714338
                  - RemoteScanRanges: 0
                  - RowBatchQueueGetWaitTime: 7184287448
                  - RowBatchQueuePutWaitTime: 311705060700
                  - RowsRead: 833333
                  - RowsReturned: 833333  <<------
                  - RowsReturnedRate: 671305
                  - ScanRangesComplete: 1
                  - ScannerThreadsInvoluntaryContextSwitches: 239
                  - ScannerThreadsTotalWallClockTime: 324721775100
                    - MaterializeTupleTime(*): 5170738686
                    - ScannerThreadsSysTime: 43492833
                    - ScannerThreadsUserTime: 938190333
                  - ScannerThreadsVoluntaryContextSwitches: 1219
                  - TotalRawHdfsReadTime(*): 21713145
                  - TotalReadThroughput: 33276
                  - TotalTime: 7881406304
          
          Show
          kwho Michael Ho added a comment - Comparing the two tpcds-q68 profiles, it appears that we first diverge at Exchange node id=22. In particular, scan node id=10 produced 833333 rows per fragment instance on average in both cases but in the case with the wrong results, it appears that the exchange node id=22 somehow returned more rows than produced by the scan node. That led to more probe rows being fed into PHJ node id=12 which caused more rows to be returned upstream in the plan. Correct results (Avg Fragment statistics): HASH_JOIN_NODE (id=12) - BuildRows: 4823 - BuildTime: 41017783 - InactiveTotalTime: 0 - LocalTime: 368666612 - NumHashTableBuildsSkipped: 0 - PeakMemoryUsage: 1981238 - ProbeRows: 833333 <<------ - ProbeRowsPartitioned: 0 - ProbeTime: 207176400 - RowsReturned: 4666 <<------ - RowsReturnedRate: 26 - TotalTime: 172093637772 Hash Join Builder (join_node_id=12) - BuildRowsPartitionTime: 11434272 - BuildRowsPartitioned: 4823 - GetNewBlockTime: 5798385 - HashBuckets: 8192 - HashCollisions: 0 - HashTablesBuildTime: 23061446 - InactiveTotalTime: 0 - LargestPartitionPercent: 6 - MaxPartitionLevel: 0 - NumRepartitions: 0 - PartitionsCreated: 16 - PeakMemoryUsage: 1878485 - PinTime: 0 - RepartitionTime: 0 - SpilledPartitions: 0 - TotalTime: 0 - UnpinTime: 0 EXCHANGE_NODE (id=23) - BytesReceived: 259360 - ConvertRowBatchTime: 1065572 - DeserializeRowBatchTimer: 40991994 - FirstBatchArrivalWaitTime: 149625481535 - InactiveTotalTime: 0 - PeakMemoryUsage: 0 - RowsReturned: 4823 - RowsReturnedRate: 31 - SendersBlockedTimer: 0 - SendersBlockedTotalTimer(*): 0 - TotalTime: 149804771602 EXCHANGE_NODE (id=22) - BytesReceived: 31057531 - ConvertRowBatchTime: 323436053 - DeserializeRowBatchTimer: 5179099024 - FirstBatchArrivalWaitTime: 0 - InactiveTotalTime: 0 - PeakMemoryUsage: 0 - RowsReturned: 833333 <<------ - RowsReturnedRate: 38018 - SendersBlockedTimer: 135348327757 - SendersBlockedTotalTimer(*): 511378794857 - TotalTime: 21920199558 HDFS_SCAN_NODE (id=10) - AverageHdfsReadThreadConcurrency: 0.0 - AverageScannerThreadConcurrency: 0.9842496806452462 - BytesRead: 10648538 - BytesReadDataNodeCache: 0 - BytesReadLocal: 10648538 - BytesReadRemoteUnexpected: 0 - BytesReadShortCircuit: 10648538 - DecompressionTime: 143249796 - InactiveTotalTime: 0 - MaxCompressedTextFileLength: 0 - NumColumns: 4 - NumDictFilteredRowGroups: 0 - NumDisksAccessed: 1 - NumRowGroups: 1 - NumScannerThreadsStarted: 1 - NumScannersWithNoReads: 0 - NumStatsFilteredRowGroups: 0 - PeakMemoryUsage: 22151612 - PerReadThreadRawHdfsThroughput: 337379422 - RemoteScanRanges: 0 - RowBatchQueueGetWaitTime: 5623630595 - RowBatchQueuePutWaitTime: 150273849222 - RowsRead: 833333 - RowsReturned: 833333 <<------ - RowsReturnedRate: 345451 - ScanRangesComplete: 1 - ScannerThreadsInvoluntaryContextSwitches: 214 - ScannerThreadsTotalWallClockTime: 160887810881 - MaterializeTupleTime(*): 3756166158 - ScannerThreadsSysTime: 21496000 - ScannerThreadsUserTime: 861701833 - ScannerThreadsVoluntaryContextSwitches: 1123 - TotalRawHdfsReadTime(*): 31026223 - TotalReadThroughput: 64880 - TotalTime: 6403569677 Wrong results: HASH_JOIN_NODE (id=12) - BuildRows: 4823 - BuildTime: 70158637 - InactiveTotalTime: 0 - LocalTime: 371353507 - NumHashTableBuildsSkipped: 0 - PeakMemoryUsage: 1981242 - ProbeRows: 834892 <<------ - ProbeRowsPartitioned: 0 - ProbeTime: 173335608 - RowsReturned: 4675 <<------ - RowsReturnedRate: 13 - TotalTime: 331755407776 Hash Join Builder (join_node_id=12) - BuildRowsPartitionTime: 18249579 - BuildRowsPartitioned: 4823 - GetNewBlockTime: 19402821 - HashBuckets: 8192 - HashCollisions: 0 - HashTablesBuildTime: 30909237 - InactiveTotalTime: 0 - LargestPartitionPercent: 6 - MaxPartitionLevel: 0 - NumRepartitions: 0 - PartitionsCreated: 16 - PeakMemoryUsage: 1878485 - PinTime: 0 - RepartitionTime: 0 - SpilledPartitions: 0 - TotalTime: 0 - UnpinTime: 0 EXCHANGE_NODE (id=23) - BytesReceived: 259364 - ConvertRowBatchTime: 1244326 - DeserializeRowBatchTimer: 92528899 - FirstBatchArrivalWaitTime: 311960446875 - InactiveTotalTime: 0 - PeakMemoryUsage: 0 - RowsReturned: 4823 - RowsReturnedRate: 14 - SendersBlockedTimer: 0 - SendersBlockedTotalTimer(*): 0 - TotalTime: 312523086174 EXCHANGE_NODE (id=22) - BytesReceived: 30365221 - ConvertRowBatchTime: 227659670 - DeserializeRowBatchTimer: 5080531802 - FirstBatchArrivalWaitTime: 0 - InactiveTotalTime: 0 - PeakMemoryUsage: 0 - RowsReturned: 834892 <<------ - RowsReturnedRate: 44328 - SendersBlockedTimer: 300209431438 - SendersBlockedTotalTimer(*): 1670817490111 - TotalTime: 18860968094 HDFS_SCAN_NODE (id=10) - AverageHdfsReadThreadConcurrency: 2.5290844714213456E-4 - AverageScannerThreadConcurrency: 0.9910697955813615 - BytesRead: 10648538 - BytesReadDataNodeCache: 0 - BytesReadLocal: 10648538 - BytesReadRemoteUnexpected: 0 - BytesReadShortCircuit: 10648538 - DecompressionTime: 122972301 - InactiveTotalTime: 0 - MaxCompressedTextFileLength: 0 - NumColumns: 4 - NumDictFilteredRowGroups: 0 - NumDisksAccessed: 1 - NumRowGroups: 1 - NumScannerThreadsStarted: 1 - NumScannersWithNoReads: 0 - NumStatsFilteredRowGroups: 0 - PeakMemoryUsage: 22320735 - PerReadThreadRawHdfsThroughput: 613714338 - RemoteScanRanges: 0 - RowBatchQueueGetWaitTime: 7184287448 - RowBatchQueuePutWaitTime: 311705060700 - RowsRead: 833333 - RowsReturned: 833333 <<------ - RowsReturnedRate: 671305 - ScanRangesComplete: 1 - ScannerThreadsInvoluntaryContextSwitches: 239 - ScannerThreadsTotalWallClockTime: 324721775100 - MaterializeTupleTime(*): 5170738686 - ScannerThreadsSysTime: 43492833 - ScannerThreadsUserTime: 938190333 - ScannerThreadsVoluntaryContextSwitches: 1219 - TotalRawHdfsReadTime(*): 21713145 - TotalReadThroughput: 33276 - TotalTime: 7881406304
          Hide
          mikesbrown Michael Brown added a comment -

          Michael Ho asked for an additional TPCDS-Q68 profile when results are correct; another-correct-tpcds-68-profile.txt uploaded.

          Show
          mikesbrown Michael Brown added a comment - Michael Ho asked for an additional TPCDS-Q68 profile when results are correct; another-correct-tpcds-68-profile.txt uploaded.
          Hide
          kwho Michael Ho added a comment - - edited

          The second profile uploaded by Michael Brown confirmed that there are indeed more probe rows received by the exchange node (id=22) than sent by scan node (id=10) in the query with wrong results. Both profiles with correct results show the same number of probe rows.

          Show
          kwho Michael Ho added a comment - - edited The second profile uploaded by Michael Brown confirmed that there are indeed more probe rows received by the exchange node (id=22) than sent by scan node (id=10) in the query with wrong results. Both profiles with correct results show the same number of probe rows.
          Hide
          kwho Michael Ho added a comment -

          Stared at the code in data-stream-recvr.cc and data-stream-sender.cc. Nothing suspicion so far.

          Michael Brown, please repro with the previous released version of Impalad to help establish a baseline. Again, please disable runtime filter to avoid non-determinism.

          Show
          kwho Michael Ho added a comment - Stared at the code in data-stream-recvr.cc and data-stream-sender.cc. Nothing suspicion so far. Michael Brown , please repro with the previous released version of Impalad to help establish a baseline. Again, please disable runtime filter to avoid non-determinism.
          Hide
          kwho Michael Ho added a comment - - edited

          Michael Brown, do you have the logs available for the timeframe when the query with the wrong result occurred ? I wonder if it's possible to be getting spurious RPC failure and resulted in re-send of the same row batch even though the other side received the row batch perfectly.

          Show
          kwho Michael Ho added a comment - - edited Michael Brown , do you have the logs available for the timeframe when the query with the wrong result occurred ? I wonder if it's possible to be getting spurious RPC failure and resulted in re-send of the same row batch even though the other side received the row batch perfectly.
          Hide
          mikesbrown Michael Brown added a comment -

          I'll set up another test with the previous impala release.

          Show
          mikesbrown Michael Brown added a comment - I'll set up another test with the previous impala release.
          Hide
          kwho Michael Ho added a comment -

          FWIW, I am referring to the code snippet below in my previous comment. cc'ing Henry Robinson and Sailesh Mukil

          Status DataStreamSender::Channel::DoTransmitDataRpc(ImpalaBackendConnection* client,
              const TTransmitDataParams& params, TTransmitDataResult* res) {
            Status status = client->DoRpc(&ImpalaBackendClient::TransmitData, params, res);
            while (status.code() == TErrorCode::RPC_RECV_TIMEOUT &&
                !runtime_state_->is_cancelled()) {
              status = client->RetryRpcRecv(&ImpalaBackendClient::recv_TransmitData, res);
            }
            return status;
          }
          
          Show
          kwho Michael Ho added a comment - FWIW, I am referring to the code snippet below in my previous comment. cc'ing Henry Robinson and Sailesh Mukil Status DataStreamSender::Channel::DoTransmitDataRpc(ImpalaBackendConnection* client, const TTransmitDataParams& params, TTransmitDataResult* res) { Status status = client->DoRpc(&ImpalaBackendClient::TransmitData, params, res); while (status.code() == TErrorCode::RPC_RECV_TIMEOUT && !runtime_state_->is_cancelled()) { status = client->RetryRpcRecv(&ImpalaBackendClient::recv_TransmitData, res); } return status; }
          Hide
          dhecht Dan Hecht added a comment -

          I think you can control that timeout with FLAGS_backend_client_rpc_timeout_ms (and I think setting it to 0 disables it).

          Show
          dhecht Dan Hecht added a comment - I think you can control that timeout with FLAGS_backend_client_rpc_timeout_ms (and I think setting it to 0 disables it).
          Hide
          sailesh Sailesh Mukil added a comment - - edited

          Michael Ho If that were the case, wouldn't it be the same issue on a non secure cluster?

          Also, that code retries the recv call, not the send call.

          Something more interesting, even though the wrong results have more rows, it has less Bytes received.

          Correct results:
          EXCHANGE_NODE (id=22)

          • BytesReceived: 31057531
          • RowsReturned: 833333

          Wrong results:
          EXCHANGE_NODE (id=22)

          • BytesReceived: 30365221
          • RowsReturned: 834892
          Show
          sailesh Sailesh Mukil added a comment - - edited Michael Ho If that were the case, wouldn't it be the same issue on a non secure cluster? Also, that code retries the recv call, not the send call. Something more interesting, even though the wrong results have more rows, it has less Bytes received. Correct results: EXCHANGE_NODE (id=22) BytesReceived: 31057531 RowsReturned: 833333 Wrong results: EXCHANGE_NODE (id=22) BytesReceived: 30365221 RowsReturned: 834892
          Hide
          kwho Michael Ho added a comment -

          Sailesh Mukil, good point. I misread the part about recv_TransmitData which is not the same as TransmitData. I was thinking the secure cluster has more overhead so it may be more prone to connection issue under stress test. Guess my theory doesn't hold water.

          Show
          kwho Michael Ho added a comment - Sailesh Mukil , good point. I misread the part about recv_TransmitData which is not the same as TransmitData. I was thinking the secure cluster has more overhead so it may be more prone to connection issue under stress test. Guess my theory doesn't hold water.
          Hide
          kwho Michael Ho added a comment -

          Correct results:
          EXCHANGE_NODE (id=22)
          BytesReceived: 31057531
          RowsReturned: 833333
          Wrong results:
          EXCHANGE_NODE (id=22)
          BytesReceived: 30365221
          RowsReturned: 834892

          The TRowBatch is probably compressed so it may just happen that the one with wrong results compress slightly better.

          Show
          kwho Michael Ho added a comment - Correct results: EXCHANGE_NODE (id=22) BytesReceived: 31057531 RowsReturned: 833333 Wrong results: EXCHANGE_NODE (id=22) BytesReceived: 30365221 RowsReturned: 834892 The TRowBatch is probably compressed so it may just happen that the one with wrong results compress slightly better.
          Hide
          sailesh Sailesh Mukil added a comment -

          Michael Ho Yes that makes sense. Especially after looking at the following. I matched the DataStreamSender profiles to the corressponding Exchange node profiles (which contains receiver information):

          Correct Profile:

                DataStreamSender (dst_id=22)
                  - BytesSent: 31057531   <<------
                  - InactiveTotalTime: 0
                  - NetworkThroughput(*): 166336656
                  - OverallThroughput: 14622105
                  - PeakMemoryUsage: 188064
                  - RowsReturned: 833333   <<------
                  - SerializeBatchTime: 1906849044
                  - TotalTime: 2097199460
                  - TransmitDataRPCTime: 186288843
                  - UncompressedRowBatchSize: 53801824
          
                  EXCHANGE_NODE (id=22)
                    - BytesReceived: 31057531   <<------
                    - ConvertRowBatchTime: 323436053
                    - DeserializeRowBatchTimer: 5179099024
                    - FirstBatchArrivalWaitTime: 0
                    - InactiveTotalTime: 0
                    - PeakMemoryUsage: 0
                    - RowsReturned: 833333   <<------
                    - RowsReturnedRate: 38018
                    - SendersBlockedTimer: 135348327757
                    - SendersBlockedTotalTimer(*): 511378794857
                    - TotalTime: 21920199558
          

          Rows returned and rows sent match in the DataStreamSender and the DataStreamReceiver. Also, the BytesReceived and the BytesSent match too.

          Wrong results profile:

                DataStreamSender (dst_id=22)
                  - BytesSent: 30307686  <<------
                  - InactiveTotalTime: 0
                  - NetworkThroughput(*): 144476234
                  - OverallThroughput: 16335557
                  - PeakMemoryUsage: 188064
                  - RowsReturned: 833333  <<------
                  - SerializeBatchTime: 1653715919
                  - TotalTime: 1865421891
                  - TransmitDataRPCTime: 209513155
                  - UncompressedRowBatchSize: 53801824
          
                  EXCHANGE_NODE (id=22)
                    - BytesReceived: 30365221  <<------
                    - ConvertRowBatchTime: 227659670
                    - DeserializeRowBatchTimer: 5080531802
                    - FirstBatchArrivalWaitTime: 0
                    - InactiveTotalTime: 0
                    - PeakMemoryUsage: 0
                    - RowsReturned: 834892  <<------
                    - RowsReturnedRate: 44328
                    - SendersBlockedTimer: 300209431438
                    - SendersBlockedTotalTimer(*): 1670817490111
                    - TotalTime: 18860968094
          

          The DataStreamSender records having sent the right number of rows, however, the DataStreamRecvr receives more 1559 rows. The BytesReceived is also greater than the BytesSent by 57535 bytes.

          Show
          sailesh Sailesh Mukil added a comment - Michael Ho Yes that makes sense. Especially after looking at the following. I matched the DataStreamSender profiles to the corressponding Exchange node profiles (which contains receiver information): Correct Profile: DataStreamSender (dst_id=22) - BytesSent: 31057531 <<------ - InactiveTotalTime: 0 - NetworkThroughput(*): 166336656 - OverallThroughput: 14622105 - PeakMemoryUsage: 188064 - RowsReturned: 833333 <<------ - SerializeBatchTime: 1906849044 - TotalTime: 2097199460 - TransmitDataRPCTime: 186288843 - UncompressedRowBatchSize: 53801824 EXCHANGE_NODE (id=22) - BytesReceived: 31057531 <<------ - ConvertRowBatchTime: 323436053 - DeserializeRowBatchTimer: 5179099024 - FirstBatchArrivalWaitTime: 0 - InactiveTotalTime: 0 - PeakMemoryUsage: 0 - RowsReturned: 833333 <<------ - RowsReturnedRate: 38018 - SendersBlockedTimer: 135348327757 - SendersBlockedTotalTimer(*): 511378794857 - TotalTime: 21920199558 Rows returned and rows sent match in the DataStreamSender and the DataStreamReceiver. Also, the BytesReceived and the BytesSent match too. Wrong results profile: DataStreamSender (dst_id=22) - BytesSent: 30307686 <<------ - InactiveTotalTime: 0 - NetworkThroughput(*): 144476234 - OverallThroughput: 16335557 - PeakMemoryUsage: 188064 - RowsReturned: 833333 <<------ - SerializeBatchTime: 1653715919 - TotalTime: 1865421891 - TransmitDataRPCTime: 209513155 - UncompressedRowBatchSize: 53801824 EXCHANGE_NODE (id=22) - BytesReceived: 30365221 <<------ - ConvertRowBatchTime: 227659670 - DeserializeRowBatchTimer: 5080531802 - FirstBatchArrivalWaitTime: 0 - InactiveTotalTime: 0 - PeakMemoryUsage: 0 - RowsReturned: 834892 <<------ - RowsReturnedRate: 44328 - SendersBlockedTimer: 300209431438 - SendersBlockedTotalTimer(*): 1670817490111 - TotalTime: 18860968094 The DataStreamSender records having sent the right number of rows, however, the DataStreamRecvr receives more 1559 rows. The BytesReceived is also greater than the BytesSent by 57535 bytes.
          Hide
          kwho Michael Ho added a comment -

          Yes, that's the same as what I found too. I also tried comparing the number of rows received by each exchange node (also partition in this case) and interestingly, they are off by some fixed offsets if sorted by rows returned. Not sure if there is much to make of it though:

          Correct result (for the two profiles attached)

                      - RowsReturned: 832352
                      - RowsReturned: 832457
                      - RowsReturned: 832824
                      - RowsReturned: 833445
                      - RowsReturned: 834188
                      - RowsReturned: 834734
          

          Wrong results:

                      - RowsReturned: 833688 (off by 1336)
                      - RowsReturned: 833793 (off by 1336)
                      - RowsReturned: 834494 (off by 1670)
                      - RowsReturned: 835115 (off by 1670)
                      - RowsReturned: 835858 (off by 1670)
                      - RowsReturned: 836404 (off by 1670)
          
          Show
          kwho Michael Ho added a comment - Yes, that's the same as what I found too. I also tried comparing the number of rows received by each exchange node (also partition in this case) and interestingly, they are off by some fixed offsets if sorted by rows returned. Not sure if there is much to make of it though: Correct result (for the two profiles attached) - RowsReturned: 832352 - RowsReturned: 832457 - RowsReturned: 832824 - RowsReturned: 833445 - RowsReturned: 834188 - RowsReturned: 834734 Wrong results: - RowsReturned: 833688 (off by 1336) - RowsReturned: 833793 (off by 1336) - RowsReturned: 834494 (off by 1670) - RowsReturned: 835115 (off by 1670) - RowsReturned: 835858 (off by 1670) - RowsReturned: 836404 (off by 1670)
          Hide
          mikesbrown Michael Brown added a comment - - edited

          A build based off Impala 2.8 showed 1 query in 10,000 with the same symptoms: more rows shown in the exchange. The net effect of this is that the final result had 29 additional, duplicate, rows. This time it was TPCDS-Q34, one already known to be producing wrong results in 2.9 against secure stress. 2 good profiles, 1 bad profile, and good/bad results are attached, all with a 2.8- filename prefix.

          Show
          mikesbrown Michael Brown added a comment - - edited A build based off Impala 2.8 showed 1 query in 10,000 with the same symptoms: more rows shown in the exchange. The net effect of this is that the final result had 29 additional, duplicate, rows. This time it was TPCDS-Q34, one already known to be producing wrong results in 2.9 against secure stress. 2 good profiles, 1 bad profile, and good/bad results are attached, all with a 2.8- filename prefix.
          Hide
          henryr Henry Robinson added a comment -

          Michael Brown - thanks for digging into 2.8. Is it fair to say that this only reproduces in secure clusters - these stress tests get run in non-secure clusters enough for us to be confident we'd have seen it there by now if it reproduced at the same frequency?

          Show
          henryr Henry Robinson added a comment - Michael Brown - thanks for digging into 2.8. Is it fair to say that this only reproduces in secure clusters - these stress tests get run in non-secure clusters enough for us to be confident we'd have seen it there by now if it reproduced at the same frequency?
          Hide
          mikesbrown Michael Brown added a comment -

          Henry Robinson Correct; it's fair to say that.

          Show
          mikesbrown Michael Brown added a comment - Henry Robinson Correct; it's fair to say that.
          Hide
          henryr Henry Robinson added a comment -

          Michael Brown - do you have log files (and were those at GLOG_v >= 1)? There's a log message ("unexpected exception: ") that would be useful to look for.

          Sailesh Mukil - do you see any way that ClientConnection::DoRpc could retry an RPC incorrectly if there was some SASL / TLS-based exception? Nothing jumps out to me right now.

          Show
          henryr Henry Robinson added a comment - Michael Brown - do you have log files (and were those at GLOG_v >= 1 )? There's a log message ("unexpected exception: ") that would be useful to look for. Sailesh Mukil - do you see any way that ClientConnection::DoRpc could retry an RPC incorrectly if there was some SASL / TLS-based exception? Nothing jumps out to me right now.
          Hide
          kwho Michael Ho added a comment -

          Henry Robinson, Sailesh Mukil, I do see the chance of two RPC being issued in DoRpc below.

            Status DoRpc(const F& f, const Request& request, Response* response,
                bool* retry_is_safe = NULL) {
              DCHECK(response != NULL);
              client_is_unrecoverable_ = true;
              if (retry_is_safe != NULL) *retry_is_safe = false;
              try {
                (client_->*f)(*response, request);
              } catch (const apache::thrift::TApplicationException& e) {
                // TApplicationException only happens in recv RPC call.
                // which means send RPC call is done, should not retry.
                return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
              } catch (const apache::thrift::TException& e) {
                if (IsRecvTimeoutTException(e)) {
                  return Status(TErrorCode::RPC_RECV_TIMEOUT, strings::Substitute(
                      "Client $0 timed-out during recv call.", TNetworkAddressToString(address_)));
                }
                VLOG(1) << "client " << client_ << " unexpected exception: "
                        << e.what() << ", type=" << typeid(e).name();
          
                // Client may have unexpectedly been closed, so re-open and retry.
                // TODO: ThriftClient should return proper error codes.
                const Status& status = Reopen();
                if (!status.ok()) {
                  if (retry_is_safe != NULL) *retry_is_safe = true;
                  return Status(TErrorCode::RPC_CLIENT_CONNECT_FAILURE, status.GetDetail());
                }
                try {
                  (client_->*f)(*response, request);
                } catch (apache::thrift::TException& e) {
                  // By this point the RPC really has failed.
                  // TODO: Revisit this logic later. It's possible that the new connection
                  // works but we hit timeout here.
                  return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
                }
              }
          

          There were some error messages like the following in the log files which suggest this catch statement may have been taken.

          ./vc0706.halxg.cloudera.com/impalad.vc0706.halxg.cloudera.com.impala.log.INFO.20170525-133204.77219:I0525 13:52:18.380208 121224 client-cache.h:260] client 0x7fe6187a3cc0 unexpected exception: SSL_read: Resource temporarily unavailable, type=N6apache6thrift9transport13TSSLExceptionE
          
          Show
          kwho Michael Ho added a comment - Henry Robinson , Sailesh Mukil , I do see the chance of two RPC being issued in DoRpc below. Status DoRpc(const F& f, const Request& request, Response* response, bool* retry_is_safe = NULL) { DCHECK(response != NULL); client_is_unrecoverable_ = true; if (retry_is_safe != NULL) *retry_is_safe = false; try { (client_->*f)(*response, request); } catch (const apache::thrift::TApplicationException& e) { // TApplicationException only happens in recv RPC call. // which means send RPC call is done, should not retry. return Status(TErrorCode::RPC_GENERAL_ERROR, e.what()); } catch (const apache::thrift::TException& e) { if (IsRecvTimeoutTException(e)) { return Status(TErrorCode::RPC_RECV_TIMEOUT, strings::Substitute( "Client $0 timed-out during recv call.", TNetworkAddressToString(address_))); } VLOG(1) << "client " << client_ << " unexpected exception: " << e.what() << ", type=" << typeid(e).name(); // Client may have unexpectedly been closed, so re-open and retry. // TODO: ThriftClient should return proper error codes. const Status& status = Reopen(); if (!status.ok()) { if (retry_is_safe != NULL) *retry_is_safe = true; return Status(TErrorCode::RPC_CLIENT_CONNECT_FAILURE, status.GetDetail()); } try { (client_->*f)(*response, request); } catch (apache::thrift::TException& e) { // By this point the RPC really has failed. // TODO: Revisit this logic later. It's possible that the new connection // works but we hit timeout here. return Status(TErrorCode::RPC_GENERAL_ERROR, e.what()); } } There were some error messages like the following in the log files which suggest this catch statement may have been taken. ./vc0706.halxg.cloudera.com/impalad.vc0706.halxg.cloudera.com.impala.log.INFO.20170525-133204.77219:I0525 13:52:18.380208 121224 client-cache.h:260] client 0x7fe6187a3cc0 unexpected exception: SSL_read: Resource temporarily unavailable, type=N6apache6thrift9transport13TSSLExceptionE
          Hide
          henryr Henry Robinson added a comment -

          Ok, that looks like the bug then. The exception gets thrown during the first invocation, doesn't hit any of the special cases (isn't TApplicationException or a timeout exception) and then gets retried. The general mistake is that seeing an application exception implies a receive error, but that's not if-and-only-if - we can get a receive-side error without throwing TApplicationException if TLS is enabled. Sailesh Mukil - make sense?

          Show
          henryr Henry Robinson added a comment - Ok, that looks like the bug then. The exception gets thrown during the first invocation, doesn't hit any of the special cases (isn't TApplicationException or a timeout exception) and then gets retried. The general mistake is that seeing an application exception implies a receive error, but that's not if-and-only-if - we can get a receive-side error without throwing TApplicationException if TLS is enabled. Sailesh Mukil - make sense?
          Hide
          henryr Henry Robinson added a comment -

          The thrown exception is TSSLException which is not TApplicationException. Which is why the catch branch gets taken.

          Show
          henryr Henry Robinson added a comment - The thrown exception is TSSLException which is not TApplicationException . Which is why the catch branch gets taken.
          Hide
          sailesh Sailesh Mukil added a comment - - edited

          Henry Robinson Yes, that looks about right. Great catch.

          TSSLException inherits from TTransportException which inherits from TException. So in the above code it gets incorrectly treated as a send error.

          The problem is that even send side errors throw TSSLException. So it would be hard to differentiate between a send and a recv.

          Show
          sailesh Sailesh Mukil added a comment - - edited Henry Robinson Yes, that looks about right. Great catch. TSSLException inherits from TTransportException which inherits from TException. So in the above code it gets incorrectly treated as a send error. The problem is that even send side errors throw TSSLException. So it would be hard to differentiate between a send and a recv.
          Hide
          kwho Michael Ho added a comment -

          https://github.com/apache/incubator-impala/commit/7db2d3064620b984f7dd9f8cd747dc45a4553a9c

          IMPALA-5388: Only retry RPC on lost connection in send call
          Previously, DoRpc() blacklists only a couple of conditions
          which shouldn't retry the RPC on exception. This is fragile
          as the errors could have happened after the payload has been
          successfully sent to the destination. Such aggressive retry
          behavior can lead to duplicated row batches being sent, causing
          wrong results in queries.

          This change fixes the problem by whitelisting the conditions
          in which the RPC can be retried. Specifically, it pattern-matches
          against certain errors in TSocket::write_partial() in the thrift
          library and only retries the RPC in those cases. With SSL enabled,
          we will never retry. We should investigate whether there are some
          cases in which it's safe to retry.

          This change also adds fault injection in the TransmitData() RPC
          caller's path to emulate different exception cases.

          Change-Id: I176975f2aa521d5be8a40de51067b1497923d09b
          Reviewed-on: http://gerrit.cloudera.org:8080/7063
          Reviewed-by: Michael Ho <kwho@cloudera.com>
          Tested-by: Impala Public Jenkins

          Show
          kwho Michael Ho added a comment - https://github.com/apache/incubator-impala/commit/7db2d3064620b984f7dd9f8cd747dc45a4553a9c IMPALA-5388 : Only retry RPC on lost connection in send call Previously, DoRpc() blacklists only a couple of conditions which shouldn't retry the RPC on exception. This is fragile as the errors could have happened after the payload has been successfully sent to the destination. Such aggressive retry behavior can lead to duplicated row batches being sent, causing wrong results in queries. This change fixes the problem by whitelisting the conditions in which the RPC can be retried. Specifically, it pattern-matches against certain errors in TSocket::write_partial() in the thrift library and only retries the RPC in those cases. With SSL enabled, we will never retry. We should investigate whether there are some cases in which it's safe to retry. This change also adds fault injection in the TransmitData() RPC caller's path to emulate different exception cases. Change-Id: I176975f2aa521d5be8a40de51067b1497923d09b Reviewed-on: http://gerrit.cloudera.org:8080/7063 Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Impala Public Jenkins

            People

            • Assignee:
              kwho Michael Ho
              Reporter:
              mikesbrown Michael Brown
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development