Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-2403

Replace BlockingQueue with a concurrent version so that readers and writers don't block

Details

    Description

      Vtune profile data shows that BlockingQueue is a potential bottleneck in the scan code.
      BlockingQueue should be replaced with a queue that allows readers and writers to operate concurrently.

      Data Of Interest (CPU Metrics)
      2 of 14: 30.6% (1.309s of 4.279s)
      
      libpthread.so.0!pthread_cond_signal - [Unknown]
      impalad!impala::ExecNode::RowBatchQueue::GetBatch+0x158 - [Unknown]:[Unknown]
      impalad!impala::HdfsScanNode::GetNextInternal+0xf0 - [Unknown]:[Unknown]
      impalad!impala::HdfsScanNode::GetNext+0x2fc - [Unknown]:[Unknown]
      impalad!impala::PartitionedAggregationNode::Open+0x344 - [Unknown]:[Unknown]
      impalad!impala::PlanFragmentExecutor::OpenInternal+0x92 - [Unknown]:[Unknown]
      impalad!impala::PlanFragmentExecutor::Open+0x2d8 - [Unknown]:[Unknown]
      impalad!impala::FragmentMgr::FragmentExecState::Exec+0x1f - [Unknown]:[Unknown]
      impalad!impala::FragmentMgr::FragmentExecThread+0x4b - [Unknown]:[Unknown]
      impalad!impala::Thread::SuperviseThread+0x1b9 - [Unknown]:[Unknown]
      impalad!boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::string const&, std::string const&, boost::function<void (void)>, impala::Promise<long>*), boost::_bi::list4<boost::_bi::value<std::string>, boost::_bi::value<std::string>, boost::_bi::value<boost::function<void (void)>>, boost::_bi::value<impala::Promise<long>*>>>>::run+0x7f - [Unknown]:[Unknown]
      impalad!func@0xd18ce0+0x62 - [Unknown]:[Unknown]
      libpthread.so.0!start_thread+0xd0 - [Unknown]:[Unknown]
      libc.so.6!clone+0x6c - [Unknown]:[Unknown]
      

      Currently the Queue size is based on, when working on the new version of the queue this should be revisited
      10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS)

      RowBatch* ExecNode::RowBatchQueue::GetBatch() {
        RowBatch* result = NULL;
        if (BlockingGet(&result)) return result;
        return NULL;
      }
      

      Attachments

        Issue Links

          Activity

            tarmstrong Tim Armstrong added a comment -

            We recently saw a case where a minor change to an earlier version of the code gave a big speedup:

            http://gerrit.cloudera.org:8080/#/c/741/

            tarmstrong Tim Armstrong added a comment - We recently saw a case where a minor change to an earlier version of the code gave a big speedup: http://gerrit.cloudera.org:8080/#/c/741/
            tlipcon Todd Lipcon added a comment -

            A small and easy improvement might be to switch to pthread condvar/mutexes instead of boost – last time we looked at the boost mutex/condvar impl, it seemed like there was a lot of inefficiency in there in order to implement cancellation in a portable way (resulting in an extra pair of lock/unlocks). Kudu has mutex/condvar wrappers if you want to grab them.

            tlipcon Todd Lipcon added a comment - A small and easy improvement might be to switch to pthread condvar/mutexes instead of boost – last time we looked at the boost mutex/condvar impl, it seemed like there was a lot of inefficiency in there in order to implement cancellation in a portable way (resulting in an extra pair of lock/unlocks). Kudu has mutex/condvar wrappers if you want to grab them.
            dhecht Daniel Hecht added a comment -

            mmokhtar, kwho, can you summarize where we're at on this one?

            dhecht Daniel Hecht added a comment - mmokhtar , kwho , can you summarize where we're at on this one?
            kwho Michael Ho added a comment -

            I have a patch which implemented double buffering idea at http://github.mtv.cloudera.com/kwho/Impala/commit/9a146097e232e2894fd350f3c262a0d4e8756c94
            but I have yet to find a workload which showed the benefit. I wasn't able to reproduce the lock contention shown in this JIRA even on the 10-node cluster.

            kwho Michael Ho added a comment - I have a patch which implemented double buffering idea at http://github.mtv.cloudera.com/kwho/Impala/commit/9a146097e232e2894fd350f3c262a0d4e8756c94 but I have yet to find a workload which showed the benefit. I wasn't able to reproduce the lock contention shown in this JIRA even on the 10-node cluster.

            Can you try collecting vtune using advanced-hotspots for a big scan query?

            mmokhtar Mostafa Mokhtar added a comment - Can you try collecting vtune using advanced-hotspots for a big scan query?
            kwho Michael Ho added a comment -

            I will probably have to coordinate with you on that as I was using perf and I think only one of the clusters have vtunes set up, right ?

            kwho Michael Ho added a comment - I will probably have to coordinate with you on that as I was using perf and I think only one of the clusters have vtunes set up, right ?

            You should be able to install vtune on any node, Perf doesn't show inlined functions or spinlocks.

            mmokhtar Mostafa Mokhtar added a comment - You should be able to install vtune on any node, Perf doesn't show inlined functions or spinlocks.
            tlipcon Todd Lipcon added a comment -

            I've looked at perf of simple scan queries quite a bit while trying to optimize kudu on Impala. I used to see the contention mentioned here but fixing the row batch queue to use deque instead of list made it all go away (http://gerrit.cloudera.org:8080/741). The old code was doing an O list traversal (include N basically guaranteed cache misses) while holding the lock, so generated lots of contention. The new code is O(1) inside the lock so it isn't really an issue.

            tlipcon Todd Lipcon added a comment - I've looked at perf of simple scan queries quite a bit while trying to optimize kudu on Impala. I used to see the contention mentioned here but fixing the row batch queue to use deque instead of list made it all go away ( http://gerrit.cloudera.org:8080/741 ). The old code was doing an O list traversal (include N basically guaranteed cache misses) while holding the lock, so generated lots of contention. The new code is O(1) inside the lock so it isn't really an issue.
            kwho Michael Ho added a comment -

            mmokhtar and I looked at the VTunes output again. It appeared that the actual problem doesn't have to do with lock contention but instead the scanner threads were blocked for a long time waiting on the blocking queue conditional variable for a slot to open up.

            One way to alleviate the problem is to increase the size of the blocking queue but this only delays the problem from happening. Fundamentally, we only have one thread pulling stuff off the queue but multiple threads producing row batches. This asymmetry will create this imbalance.

            I tried switching to using posix mutex and conditional variable but no obvious performance improvement was observed. Please feel free to reopen this JIRA if there are other ideas we can try.

            kwho Michael Ho added a comment - mmokhtar and I looked at the VTunes output again. It appeared that the actual problem doesn't have to do with lock contention but instead the scanner threads were blocked for a long time waiting on the blocking queue conditional variable for a slot to open up. One way to alleviate the problem is to increase the size of the blocking queue but this only delays the problem from happening. Fundamentally, we only have one thread pulling stuff off the queue but multiple threads producing row batches. This asymmetry will create this imbalance. I tried switching to using posix mutex and conditional variable but no obvious performance improvement was observed. Please feel free to reopen this JIRA if there are other ideas we can try.
            helifu LiFu He added a comment -

            i compared kuduScanner to hdfsScanner, and found the 'RowBatchQueuePutWaitTime' took too much time.

            parquet:

                    HASH_JOIN_NODE (id=6):(Total: 3m41s, non-child: 3m24s, % non-child: 92.34%)
                      ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously
                       - BuildRows: 227.55M (227546432)
                       - BuildTime: 1m15s
                       - NumHashTableBuildsSkipped: 0 (0)
                       - PeakMemoryUsage: 17.14 GB (18399351552)
                       - ProbeRows: 2.00B (2002021102)
                       - ProbeRowsPartitioned: 0 (0)
                       - ProbeTime: 2m5s
                       - RowsReturned: 303.70M (303696158)
                       - {color:red}RowsReturnedRate: 1.37 M/sec{color}
                      Hash Join Builder (join_node_id=6):
                        ExecOption: Build Side Codegen Enabled, Hash Table Construction Codegen Enabled
                        Runtime filters: 0 of 1 Runtime Filter Published, 1 Disabled
                         - BuildRowsPartitionTime: 54s216ms
                         - BuildRowsPartitioned: 227.55M (227546432)
                         - GetNewBlockTime: 12.848ms
                         - HashBuckets: 536.87M (536870912)
                         - HashCollisions: 0 (0)
                         - HashTablesBuildTime: 21s018ms
                         - LargestPartitionPercent: 6 (6)
                         - MaxPartitionLevel: 0 (0)
                         - NumRepartitions: 0 (0)
                         - PartitionsCreated: 16 (16)
                         - PeakMemoryUsage: 17.13 GB (18397287552)
                         - PinTime: 0.000ns
                         - RepartitionTime: 0.000ns
                         - SpilledPartitions: 0 (0)
                         - UnpinTime: 0.000ns
                      EXCHANGE_NODE (id=13):(Total: 9s405ms, non-child: 9s053ms, % non-child: 96.25%)
                        BytesReceived(8s000ms): 207.33 MB, 783.65 MB, 1.34 GB, 1.94 GB, 2.51 GB, 3.10 GB, 3.67 GB, 4.27 GB, 4.65 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB
                         - BytesReceived: 4.66 GB (5004716793)
                         - ConvertRowBatchTime: 5s146ms
                         - DeserializeRowBatchTimer: 19s380ms
                         - FirstBatchArrivalWaitTime: 0.000ns
                         - PeakMemoryUsage: 0
                         - RowsReturned: 227.55M (227546432)
                         - RowsReturnedRate: 24.19 M/sec
                         - SendersBlockedTimer: 17s512ms
                         - SendersBlockedTotalTimer(*): 40s412ms
                    HDFS_SCAN_NODE (id=2):(Total: 7s537ms, non-child: 7s537ms, % non-child: 100.00%)
                      Hdfs split stats (<volume id>:<# splits>/<split lengths>): 0:45/11.12 GB 4:100/24.71 GB 2:46/11.37 GB 1:52/12.85 GB 5:51/12.60 GB 
                      ExecOption: PARQUET Codegen Enabled, Codegen enabled: 294 out of 294
                      Runtime filters: All filters arrived. Waited 0
                      Hdfs Read Thread Concurrency Bucket: 0:95.99% 1:2.139% 2:0.4011% 3:0.4011% 4:0.4011% 5:0.6684% 6:0% 7:0% 8:0% 9:0% 10:0% 
                      File Formats: PARQUET/SNAPPY:1176 
                      BytesRead(8s000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 335.98 MB, 3.33 GB, 3.33 GB, 3.33 GB, 3.33 GB, 4.19 GB, 6.65 GB, 6.65 GB, 6.65 GB, 6.65 GB, 8.02 GB, 9.98 GB, 9.98 GB, 9.98 GB, 9.98 GB, 11.82 GB, 13.31 GB, 13.31 GB, 13.31 GB, 13.31 GB, 15.17 GB, 16.64 GB, 16.64 GB, 16.64 GB, 16.64 GB, 18.57 GB, 19.96 GB, 19.96 GB, 19.96 GB, 19.96 GB, 21.80 GB, 23.29 GB, 23.29 GB, 23.29 GB, 23.29 GB, 24.28 GB
                       - FooterProcessingTime: (Avg: 32.080ms ; Min: 975.625us ; Max: 708.559ms ; Number of samples: 294)
                       - AverageHdfsReadThreadConcurrency: 0.09 
                       - AverageScannerThreadConcurrency: 29.99 
                       - BytesRead: 24.45 GB (26258204410)
                       - BytesReadDataNodeCache: 0
                       - BytesReadLocal: 24.45 GB (26258204410)
                       - BytesReadRemoteUnexpected: 0
                       - BytesReadShortCircuit: 24.45 GB (26258204410)
                       - DecompressionTime: 1m58s
                       - MaxCompressedTextFileLength: 0
                       - NumColumns: 4 (4)
                       - NumDictFilteredRowGroups: 0 (0)
                       - NumDisksAccessed: 5 (5)
                       - NumRowGroups: 294 (294)
                       - NumScannerThreadsStarted: 40 (40)
                       - NumScannersWithNoReads: 0 (0)
                       - NumStatsFilteredRowGroups: 0 (0)
                       - PeakMemoryUsage: 3.61 GB (3872112578)
                       - PerReadThreadRawHdfsThroughput: 797.92 MB/sec
                       - RemoteScanRanges: 0 (0)
            {color:red}           - RowBatchQueueGetWaitTime: 49.408ms
                       - RowBatchQueuePutWaitTime: 3h{color}
                       - RowsRead: 2.00B (2002021102)
                       - RowsReturned: 2.00B (2002021102)
                       - RowsReturnedRate: 265.62 M/sec
                       - ScanRangesComplete: 294 (294)
                       - ScannerThreadsInvoluntaryContextSwitches: 3.65K (3655)
                       - ScannerThreadsTotalWallClockTime: 3h6m
                         - MaterializeTupleTime(*): 3m32s
                         - ScannerThreadsSysTime: 48s120ms
                         - ScannerThreadsUserTime: 4m59s
                       - ScannerThreadsVoluntaryContextSwitches: 1.97M (1972275)
                       - TotalRawHdfsReadTime(*): 31s383ms
                       - TotalReadThroughput: 66.69 MB/sec
                      Filter 4 (4.00 MB):
                         - Rows processed: 4.82M (4816896)
                         - Rows rejected: 0 (0)
                         - Rows total: 2.00B (2002021102)
                      Filter 5 (16.00 MB):
                         - Rows processed: 4.82M (4816896)
                         - Rows rejected: 0 (0)
                         - Rows total: 2.00B (2002021102)
            

            kudu:

                    HASH_JOIN_NODE (id=6):(Total: 6m29s, non-child: 6m12s, % non-child: 95.83%)
                      ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously
                       - BuildRows: 227.55M (227546432)
                       - BuildTime: 53s604ms
                       - NumHashTableBuildsSkipped: 0 (0)
                       - PeakMemoryUsage: 11.63 GB (12491762944)
                       - ProbeRows: 2.00B (2000049901)
                       - ProbeRowsPartitioned: 0 (0)
                       - ProbeTime: 5m15s
                       - RowsReturned: 303.39M (303386833)
                      {color:red} - RowsReturnedRate: 779.91 K/sec{color}
                      Hash Join Builder (join_node_id=6):
                        ExecOption: Build Side Codegen Enabled, Hash Table Construction Codegen Enabled
                        Runtime filters: 0 of 1 Runtime Filter Published, 1 Disabled
                         - BuildRowsPartitionTime: 35s049ms
                         - BuildRowsPartitioned: 227.55M (227546432)
                         - GetNewBlockTime: 13.744ms
                         - HashBuckets: 536.87M (536870912)
                         - HashCollisions: 0 (0)
                         - HashTablesBuildTime: 18s507ms
                         - LargestPartitionPercent: 6 (6)
                         - MaxPartitionLevel: 0 (0)
                         - NumRepartitions: 0 (0)
                         - PartitionsCreated: 16 (16)
                         - PeakMemoryUsage: 11.63 GB (12491707520)
                         - PinTime: 0.000ns
                         - RepartitionTime: 0.000ns
                         - SpilledPartitions: 0 (0)
                         - UnpinTime: 0.000ns
                      EXCHANGE_NODE (id=13):(Total: 7s265ms, non-child: 6s330ms, % non-child: 87.14%)
                        BytesReceived(16s000ms): 525.45 MB, 1.71 GB, 2.86 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB
                         - BytesReceived: 3.15 GB (3383413889)
                         - ConvertRowBatchTime: 4s846ms
                         - DeserializeRowBatchTimer: 8s021ms
                         - FirstBatchArrivalWaitTime: 0.000ns
                         - PeakMemoryUsage: 0
                         - RowsReturned: 227.55M (227546432)
                         - RowsReturnedRate: 31.32 M/sec
                         - SendersBlockedTimer: 11s284ms
                         - SendersBlockedTotalTimer(*): 22s888ms
                    KUDU_SCAN_NODE (id=2):(Total: 8s961ms, non-child: 8s961ms, % non-child: 100.00%)
                      Runtime filters: All filters arrived. Waited 0
                      BytesRead(16s000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
                       - BytesRead: 0
                       - KuduRemoteScanTokens: 0 (0)
                       - NumScannerThreadsStarted: 40 (40)
                       - PeakMemoryUsage: 5.65 MB (5919744)
            {color:red}           - RowBatchQueueGetWaitTime: 0.000ns
                       - RowBatchQueuePutWaitTime: 5h53m{color}
                       - RowsRead: 2.00B (2000049901)
                       - RowsReturned: 2.00B (2000049901)
                       - RowsReturnedRate: 223.18 M/sec
                       - ScanRangesComplete: 40 (40)
                       - ScannerThreadsInvoluntaryContextSwitches: 2.25K (2251)
                       - ScannerThreadsTotalWallClockTime: 6h2m
                         - MaterializeTupleTime(*): 8s754ms
                         - ScannerThreadsSysTime: 56s576ms
                         - ScannerThreadsUserTime: 2m8s
                       - ScannerThreadsVoluntaryContextSwitches: 1.99M (1993935)
                       - TotalKuduScanRoundTrips: 7.40K (7403)
                       - TotalReadThroughput: 0.00 /sec
                      Filter 4 (4.00 MB):
                         - Rows processed: 0 (0)
                         - Rows rejected: 0 (0)
                         - Rows total: 0 (0)
                      Filter 5 (16.00 MB):
                         - Rows processed: 0 (0)
                         - Rows rejected: 0 (0)
                         - Rows total: 0 (0)
            
            helifu LiFu He added a comment - i compared kuduScanner to hdfsScanner, and found the 'RowBatchQueuePutWaitTime' took too much time. parquet: HASH_JOIN_NODE (id=6):(Total: 3m41s, non-child: 3m24s, % non-child: 92.34%) ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously - BuildRows: 227.55M (227546432) - BuildTime: 1m15s - NumHashTableBuildsSkipped: 0 (0) - PeakMemoryUsage: 17.14 GB (18399351552) - ProbeRows: 2.00B (2002021102) - ProbeRowsPartitioned: 0 (0) - ProbeTime: 2m5s - RowsReturned: 303.70M (303696158) - {color:red}RowsReturnedRate: 1.37 M/sec{color} Hash Join Builder (join_node_id=6): ExecOption: Build Side Codegen Enabled, Hash Table Construction Codegen Enabled Runtime filters: 0 of 1 Runtime Filter Published, 1 Disabled - BuildRowsPartitionTime: 54s216ms - BuildRowsPartitioned: 227.55M (227546432) - GetNewBlockTime: 12.848ms - HashBuckets: 536.87M (536870912) - HashCollisions: 0 (0) - HashTablesBuildTime: 21s018ms - LargestPartitionPercent: 6 (6) - MaxPartitionLevel: 0 (0) - NumRepartitions: 0 (0) - PartitionsCreated: 16 (16) - PeakMemoryUsage: 17.13 GB (18397287552) - PinTime: 0.000ns - RepartitionTime: 0.000ns - SpilledPartitions: 0 (0) - UnpinTime: 0.000ns EXCHANGE_NODE (id=13):(Total: 9s405ms, non-child: 9s053ms, % non-child: 96.25%) BytesReceived(8s000ms): 207.33 MB, 783.65 MB, 1.34 GB, 1.94 GB, 2.51 GB, 3.10 GB, 3.67 GB, 4.27 GB, 4.65 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB - BytesReceived: 4.66 GB (5004716793) - ConvertRowBatchTime: 5s146ms - DeserializeRowBatchTimer: 19s380ms - FirstBatchArrivalWaitTime: 0.000ns - PeakMemoryUsage: 0 - RowsReturned: 227.55M (227546432) - RowsReturnedRate: 24.19 M/sec - SendersBlockedTimer: 17s512ms - SendersBlockedTotalTimer(*): 40s412ms HDFS_SCAN_NODE (id=2):(Total: 7s537ms, non-child: 7s537ms, % non-child: 100.00%) Hdfs split stats (<volume id>:<# splits>/<split lengths>): 0:45/11.12 GB 4:100/24.71 GB 2:46/11.37 GB 1:52/12.85 GB 5:51/12.60 GB ExecOption: PARQUET Codegen Enabled, Codegen enabled: 294 out of 294 Runtime filters: All filters arrived. Waited 0 Hdfs Read Thread Concurrency Bucket: 0:95.99% 1:2.139% 2:0.4011% 3:0.4011% 4:0.4011% 5:0.6684% 6:0% 7:0% 8:0% 9:0% 10:0% File Formats: PARQUET/SNAPPY:1176 BytesRead(8s000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 335.98 MB, 3.33 GB, 3.33 GB, 3.33 GB, 3.33 GB, 4.19 GB, 6.65 GB, 6.65 GB, 6.65 GB, 6.65 GB, 8.02 GB, 9.98 GB, 9.98 GB, 9.98 GB, 9.98 GB, 11.82 GB, 13.31 GB, 13.31 GB, 13.31 GB, 13.31 GB, 15.17 GB, 16.64 GB, 16.64 GB, 16.64 GB, 16.64 GB, 18.57 GB, 19.96 GB, 19.96 GB, 19.96 GB, 19.96 GB, 21.80 GB, 23.29 GB, 23.29 GB, 23.29 GB, 23.29 GB, 24.28 GB - FooterProcessingTime: (Avg: 32.080ms ; Min: 975.625us ; Max: 708.559ms ; Number of samples: 294) - AverageHdfsReadThreadConcurrency: 0.09 - AverageScannerThreadConcurrency: 29.99 - BytesRead: 24.45 GB (26258204410) - BytesReadDataNodeCache: 0 - BytesReadLocal: 24.45 GB (26258204410) - BytesReadRemoteUnexpected: 0 - BytesReadShortCircuit: 24.45 GB (26258204410) - DecompressionTime: 1m58s - MaxCompressedTextFileLength: 0 - NumColumns: 4 (4) - NumDictFilteredRowGroups: 0 (0) - NumDisksAccessed: 5 (5) - NumRowGroups: 294 (294) - NumScannerThreadsStarted: 40 (40) - NumScannersWithNoReads: 0 (0) - NumStatsFilteredRowGroups: 0 (0) - PeakMemoryUsage: 3.61 GB (3872112578) - PerReadThreadRawHdfsThroughput: 797.92 MB/sec - RemoteScanRanges: 0 (0) {color:red} - RowBatchQueueGetWaitTime: 49.408ms - RowBatchQueuePutWaitTime: 3h{color} - RowsRead: 2.00B (2002021102) - RowsReturned: 2.00B (2002021102) - RowsReturnedRate: 265.62 M/sec - ScanRangesComplete: 294 (294) - ScannerThreadsInvoluntaryContextSwitches: 3.65K (3655) - ScannerThreadsTotalWallClockTime: 3h6m - MaterializeTupleTime(*): 3m32s - ScannerThreadsSysTime: 48s120ms - ScannerThreadsUserTime: 4m59s - ScannerThreadsVoluntaryContextSwitches: 1.97M (1972275) - TotalRawHdfsReadTime(*): 31s383ms - TotalReadThroughput: 66.69 MB/sec Filter 4 (4.00 MB): - Rows processed: 4.82M (4816896) - Rows rejected: 0 (0) - Rows total: 2.00B (2002021102) Filter 5 (16.00 MB): - Rows processed: 4.82M (4816896) - Rows rejected: 0 (0) - Rows total: 2.00B (2002021102) kudu: HASH_JOIN_NODE (id=6):(Total: 6m29s, non-child: 6m12s, % non-child: 95.83%) ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously - BuildRows: 227.55M (227546432) - BuildTime: 53s604ms - NumHashTableBuildsSkipped: 0 (0) - PeakMemoryUsage: 11.63 GB (12491762944) - ProbeRows: 2.00B (2000049901) - ProbeRowsPartitioned: 0 (0) - ProbeTime: 5m15s - RowsReturned: 303.39M (303386833) {color:red} - RowsReturnedRate: 779.91 K/sec{color} Hash Join Builder (join_node_id=6): ExecOption: Build Side Codegen Enabled, Hash Table Construction Codegen Enabled Runtime filters: 0 of 1 Runtime Filter Published, 1 Disabled - BuildRowsPartitionTime: 35s049ms - BuildRowsPartitioned: 227.55M (227546432) - GetNewBlockTime: 13.744ms - HashBuckets: 536.87M (536870912) - HashCollisions: 0 (0) - HashTablesBuildTime: 18s507ms - LargestPartitionPercent: 6 (6) - MaxPartitionLevel: 0 (0) - NumRepartitions: 0 (0) - PartitionsCreated: 16 (16) - PeakMemoryUsage: 11.63 GB (12491707520) - PinTime: 0.000ns - RepartitionTime: 0.000ns - SpilledPartitions: 0 (0) - UnpinTime: 0.000ns EXCHANGE_NODE (id=13):(Total: 7s265ms, non-child: 6s330ms, % non-child: 87.14%) BytesReceived(16s000ms): 525.45 MB, 1.71 GB, 2.86 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB - BytesReceived: 3.15 GB (3383413889) - ConvertRowBatchTime: 4s846ms - DeserializeRowBatchTimer: 8s021ms - FirstBatchArrivalWaitTime: 0.000ns - PeakMemoryUsage: 0 - RowsReturned: 227.55M (227546432) - RowsReturnedRate: 31.32 M/sec - SendersBlockedTimer: 11s284ms - SendersBlockedTotalTimer(*): 22s888ms KUDU_SCAN_NODE (id=2):(Total: 8s961ms, non-child: 8s961ms, % non-child: 100.00%) Runtime filters: All filters arrived. Waited 0 BytesRead(16s000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 - BytesRead: 0 - KuduRemoteScanTokens: 0 (0) - NumScannerThreadsStarted: 40 (40) - PeakMemoryUsage: 5.65 MB (5919744) {color:red} - RowBatchQueueGetWaitTime: 0.000ns - RowBatchQueuePutWaitTime: 5h53m{color} - RowsRead: 2.00B (2000049901) - RowsReturned: 2.00B (2000049901) - RowsReturnedRate: 223.18 M/sec - ScanRangesComplete: 40 (40) - ScannerThreadsInvoluntaryContextSwitches: 2.25K (2251) - ScannerThreadsTotalWallClockTime: 6h2m - MaterializeTupleTime(*): 8s754ms - ScannerThreadsSysTime: 56s576ms - ScannerThreadsUserTime: 2m8s - ScannerThreadsVoluntaryContextSwitches: 1.99M (1993935) - TotalKuduScanRoundTrips: 7.40K (7403) - TotalReadThroughput: 0.00 /sec Filter 4 (4.00 MB): - Rows processed: 0 (0) - Rows rejected: 0 (0) - Rows total: 0 (0) Filter 5 (16.00 MB): - Rows processed: 0 (0) - Rows rejected: 0 (0) - Rows total: 0 (0)
            tarmstrong Tim Armstrong added a comment -

            helifu the RowBatchQueuePutWaitTime could indicate that the queue was full, which would mean that the Kudu scanners were adding data to the queue faster than the consumer could remove it, or it could indicate that the CPU is oversubscribed and the Kudu scanners are context-switching when blocked on the queue.

            tarmstrong Tim Armstrong added a comment - helifu the RowBatchQueuePutWaitTime could indicate that the queue was full, which would mean that the Kudu scanners were adding data to the queue faster than the consumer could remove it, or it could indicate that the CPU is oversubscribed and the Kudu scanners are context-switching when blocked on the queue.
            helifu LiFu He added a comment -

            Thanks Tim. your reply is correct. maybe i will try to dig on https://issues.apache.org/jira/browse/IMPALA-2797

            helifu LiFu He added a comment - Thanks Tim. your reply is correct. maybe i will try to dig on https://issues.apache.org/jira/browse/IMPALA-2797
            kwho Michael Ho added a comment -

            helifu, what you are seeing is an indication that the scan node was producing row batches faster than the consumer of the scan node (which seems to be the probe side of the PHJ node if I read the profile correctly) could consume them. In particular, the probe time in the HashJoin node differs a lot in these two profiles, which is consistent with the observation that the amount of time scanner threads in the Kudu scan node blocked waiting for insert into the row batch queue is much larger than that in the Hdfs scan node. While one may argue that IMPALA-2797 will help by throttling the number of active scanner threads so the main execution thread can get more CPU time, I am not entirely convinced this is the problem here because the Hdfs scan node has the same level of imbalance and its consumer doesn't suffer as much unless Kudu scan node's tuple materialization is much more CPU intensive which doesn't seem to be the case according to ScannerThreadsUserTime or MaterializeTupleTime. So, there seems to be something else wrong here with the PHJ probing.

            Another key thing to note is that once the row batch queue in the scan node fills up, it's already implicitly throttling all scanner threads so the main execution thread shouldn't be drowned out unless there is something else running on the host at that time.

            Longer term, the imbalance between the scanner threads and the main execution thread should go away once we move over to multi-threading (IMPALA-3902).

                    HASH_JOIN_NODE (id=6):(Total: 3m41s, non-child: 3m24s, % non-child: 92.34%)
                      ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously
                       - BuildRows: 227.55M (227546432)
                       - BuildTime: 1m15s
                       - NumHashTableBuildsSkipped: 0 (0)
                       - PeakMemoryUsage: 17.14 GB (18399351552)
                       - ProbeRows: 2.00B (2002021102)
                       - ProbeRowsPartitioned: 0 (0)
                       - ProbeTime: 2m5s <<<-----
            

            vs

                    HASH_JOIN_NODE (id=6):(Total: 6m29s, non-child: 6m12s, % non-child: 95.83%)
                      ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously
                       - BuildRows: 227.55M (227546432)
                       - BuildTime: 53s604ms
                       - NumHashTableBuildsSkipped: 0 (0)
                       - PeakMemoryUsage: 11.63 GB (12491762944)
                       - ProbeRows: 2.00B (2000049901)
                       - ProbeRowsPartitioned: 0 (0)
                       - ProbeTime: 5m15s <<------
            
            kwho Michael Ho added a comment - helifu , what you are seeing is an indication that the scan node was producing row batches faster than the consumer of the scan node (which seems to be the probe side of the PHJ node if I read the profile correctly) could consume them. In particular, the probe time in the HashJoin node differs a lot in these two profiles, which is consistent with the observation that the amount of time scanner threads in the Kudu scan node blocked waiting for insert into the row batch queue is much larger than that in the Hdfs scan node. While one may argue that IMPALA-2797 will help by throttling the number of active scanner threads so the main execution thread can get more CPU time, I am not entirely convinced this is the problem here because the Hdfs scan node has the same level of imbalance and its consumer doesn't suffer as much unless Kudu scan node's tuple materialization is much more CPU intensive which doesn't seem to be the case according to ScannerThreadsUserTime or MaterializeTupleTime . So, there seems to be something else wrong here with the PHJ probing. Another key thing to note is that once the row batch queue in the scan node fills up, it's already implicitly throttling all scanner threads so the main execution thread shouldn't be drowned out unless there is something else running on the host at that time. Longer term, the imbalance between the scanner threads and the main execution thread should go away once we move over to multi-threading ( IMPALA-3902 ). HASH_JOIN_NODE (id=6):(Total: 3m41s, non-child: 3m24s, % non-child: 92.34%) ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously - BuildRows: 227.55M (227546432) - BuildTime: 1m15s - NumHashTableBuildsSkipped: 0 (0) - PeakMemoryUsage: 17.14 GB (18399351552) - ProbeRows: 2.00B (2002021102) - ProbeRowsPartitioned: 0 (0) - ProbeTime: 2m5s <<<----- vs HASH_JOIN_NODE (id=6):(Total: 6m29s, non-child: 6m12s, % non-child: 95.83%) ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously - BuildRows: 227.55M (227546432) - BuildTime: 53s604ms - NumHashTableBuildsSkipped: 0 (0) - PeakMemoryUsage: 11.63 GB (12491762944) - ProbeRows: 2.00B (2000049901) - ProbeRowsPartitioned: 0 (0) - ProbeTime: 5m15s <<------

            People

              kwho Michael Ho
              mmokhtar Mostafa Mokhtar
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: