Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Cannot Reproduce
-
Impala 2.3.0
Description
Vtune profile data shows that BlockingQueue is a potential bottleneck in the scan code.
BlockingQueue should be replaced with a queue that allows readers and writers to operate concurrently.
Data Of Interest (CPU Metrics) 2 of 14: 30.6% (1.309s of 4.279s) libpthread.so.0!pthread_cond_signal - [Unknown] impalad!impala::ExecNode::RowBatchQueue::GetBatch+0x158 - [Unknown]:[Unknown] impalad!impala::HdfsScanNode::GetNextInternal+0xf0 - [Unknown]:[Unknown] impalad!impala::HdfsScanNode::GetNext+0x2fc - [Unknown]:[Unknown] impalad!impala::PartitionedAggregationNode::Open+0x344 - [Unknown]:[Unknown] impalad!impala::PlanFragmentExecutor::OpenInternal+0x92 - [Unknown]:[Unknown] impalad!impala::PlanFragmentExecutor::Open+0x2d8 - [Unknown]:[Unknown] impalad!impala::FragmentMgr::FragmentExecState::Exec+0x1f - [Unknown]:[Unknown] impalad!impala::FragmentMgr::FragmentExecThread+0x4b - [Unknown]:[Unknown] impalad!impala::Thread::SuperviseThread+0x1b9 - [Unknown]:[Unknown] impalad!boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::string const&, std::string const&, boost::function<void (void)>, impala::Promise<long>*), boost::_bi::list4<boost::_bi::value<std::string>, boost::_bi::value<std::string>, boost::_bi::value<boost::function<void (void)>>, boost::_bi::value<impala::Promise<long>*>>>>::run+0x7f - [Unknown]:[Unknown] impalad!func@0xd18ce0+0x62 - [Unknown]:[Unknown] libpthread.so.0!start_thread+0xd0 - [Unknown]:[Unknown] libc.so.6!clone+0x6c - [Unknown]:[Unknown]
Currently the Queue size is based on, when working on the new version of the queue this should be revisited
10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS)
RowBatch* ExecNode::RowBatchQueue::GetBatch() { RowBatch* result = NULL; if (BlockingGet(&result)) return result; return NULL; }
Attachments
Issue Links
- is related to
-
IMPALA-4026 Investigate regression introduced by "IMPALA-3629: Codegen TransferScratchTuples"
- Resolved
-
IMPALA-2712 Investigate increasing batch size
- Resolved
Activity
A small and easy improvement might be to switch to pthread condvar/mutexes instead of boost – last time we looked at the boost mutex/condvar impl, it seemed like there was a lot of inefficiency in there in order to implement cancellation in a portable way (resulting in an extra pair of lock/unlocks). Kudu has mutex/condvar wrappers if you want to grab them.
I have a patch which implemented double buffering idea at http://github.mtv.cloudera.com/kwho/Impala/commit/9a146097e232e2894fd350f3c262a0d4e8756c94
but I have yet to find a workload which showed the benefit. I wasn't able to reproduce the lock contention shown in this JIRA even on the 10-node cluster.
Can you try collecting vtune using advanced-hotspots for a big scan query?
I will probably have to coordinate with you on that as I was using perf and I think only one of the clusters have vtunes set up, right ?
You should be able to install vtune on any node, Perf doesn't show inlined functions or spinlocks.
I've looked at perf of simple scan queries quite a bit while trying to optimize kudu on Impala. I used to see the contention mentioned here but fixing the row batch queue to use deque instead of list made it all go away (http://gerrit.cloudera.org:8080/741). The old code was doing an O list traversal (include N basically guaranteed cache misses) while holding the lock, so generated lots of contention. The new code is O(1) inside the lock so it isn't really an issue.
mmokhtar and I looked at the VTunes output again. It appeared that the actual problem doesn't have to do with lock contention but instead the scanner threads were blocked for a long time waiting on the blocking queue conditional variable for a slot to open up.
One way to alleviate the problem is to increase the size of the blocking queue but this only delays the problem from happening. Fundamentally, we only have one thread pulling stuff off the queue but multiple threads producing row batches. This asymmetry will create this imbalance.
I tried switching to using posix mutex and conditional variable but no obvious performance improvement was observed. Please feel free to reopen this JIRA if there are other ideas we can try.
i compared kuduScanner to hdfsScanner, and found the 'RowBatchQueuePutWaitTime' took too much time.
parquet:
HASH_JOIN_NODE (id=6):(Total: 3m41s, non-child: 3m24s, % non-child: 92.34%) ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously - BuildRows: 227.55M (227546432) - BuildTime: 1m15s - NumHashTableBuildsSkipped: 0 (0) - PeakMemoryUsage: 17.14 GB (18399351552) - ProbeRows: 2.00B (2002021102) - ProbeRowsPartitioned: 0 (0) - ProbeTime: 2m5s - RowsReturned: 303.70M (303696158) - {color:red}RowsReturnedRate: 1.37 M/sec{color} Hash Join Builder (join_node_id=6): ExecOption: Build Side Codegen Enabled, Hash Table Construction Codegen Enabled Runtime filters: 0 of 1 Runtime Filter Published, 1 Disabled - BuildRowsPartitionTime: 54s216ms - BuildRowsPartitioned: 227.55M (227546432) - GetNewBlockTime: 12.848ms - HashBuckets: 536.87M (536870912) - HashCollisions: 0 (0) - HashTablesBuildTime: 21s018ms - LargestPartitionPercent: 6 (6) - MaxPartitionLevel: 0 (0) - NumRepartitions: 0 (0) - PartitionsCreated: 16 (16) - PeakMemoryUsage: 17.13 GB (18397287552) - PinTime: 0.000ns - RepartitionTime: 0.000ns - SpilledPartitions: 0 (0) - UnpinTime: 0.000ns EXCHANGE_NODE (id=13):(Total: 9s405ms, non-child: 9s053ms, % non-child: 96.25%) BytesReceived(8s000ms): 207.33 MB, 783.65 MB, 1.34 GB, 1.94 GB, 2.51 GB, 3.10 GB, 3.67 GB, 4.27 GB, 4.65 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB, 4.66 GB - BytesReceived: 4.66 GB (5004716793) - ConvertRowBatchTime: 5s146ms - DeserializeRowBatchTimer: 19s380ms - FirstBatchArrivalWaitTime: 0.000ns - PeakMemoryUsage: 0 - RowsReturned: 227.55M (227546432) - RowsReturnedRate: 24.19 M/sec - SendersBlockedTimer: 17s512ms - SendersBlockedTotalTimer(*): 40s412ms HDFS_SCAN_NODE (id=2):(Total: 7s537ms, non-child: 7s537ms, % non-child: 100.00%) Hdfs split stats (<volume id>:<# splits>/<split lengths>): 0:45/11.12 GB 4:100/24.71 GB 2:46/11.37 GB 1:52/12.85 GB 5:51/12.60 GB ExecOption: PARQUET Codegen Enabled, Codegen enabled: 294 out of 294 Runtime filters: All filters arrived. Waited 0 Hdfs Read Thread Concurrency Bucket: 0:95.99% 1:2.139% 2:0.4011% 3:0.4011% 4:0.4011% 5:0.6684% 6:0% 7:0% 8:0% 9:0% 10:0% File Formats: PARQUET/SNAPPY:1176 BytesRead(8s000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 335.98 MB, 3.33 GB, 3.33 GB, 3.33 GB, 3.33 GB, 4.19 GB, 6.65 GB, 6.65 GB, 6.65 GB, 6.65 GB, 8.02 GB, 9.98 GB, 9.98 GB, 9.98 GB, 9.98 GB, 11.82 GB, 13.31 GB, 13.31 GB, 13.31 GB, 13.31 GB, 15.17 GB, 16.64 GB, 16.64 GB, 16.64 GB, 16.64 GB, 18.57 GB, 19.96 GB, 19.96 GB, 19.96 GB, 19.96 GB, 21.80 GB, 23.29 GB, 23.29 GB, 23.29 GB, 23.29 GB, 24.28 GB - FooterProcessingTime: (Avg: 32.080ms ; Min: 975.625us ; Max: 708.559ms ; Number of samples: 294) - AverageHdfsReadThreadConcurrency: 0.09 - AverageScannerThreadConcurrency: 29.99 - BytesRead: 24.45 GB (26258204410) - BytesReadDataNodeCache: 0 - BytesReadLocal: 24.45 GB (26258204410) - BytesReadRemoteUnexpected: 0 - BytesReadShortCircuit: 24.45 GB (26258204410) - DecompressionTime: 1m58s - MaxCompressedTextFileLength: 0 - NumColumns: 4 (4) - NumDictFilteredRowGroups: 0 (0) - NumDisksAccessed: 5 (5) - NumRowGroups: 294 (294) - NumScannerThreadsStarted: 40 (40) - NumScannersWithNoReads: 0 (0) - NumStatsFilteredRowGroups: 0 (0) - PeakMemoryUsage: 3.61 GB (3872112578) - PerReadThreadRawHdfsThroughput: 797.92 MB/sec - RemoteScanRanges: 0 (0) {color:red} - RowBatchQueueGetWaitTime: 49.408ms - RowBatchQueuePutWaitTime: 3h{color} - RowsRead: 2.00B (2002021102) - RowsReturned: 2.00B (2002021102) - RowsReturnedRate: 265.62 M/sec - ScanRangesComplete: 294 (294) - ScannerThreadsInvoluntaryContextSwitches: 3.65K (3655) - ScannerThreadsTotalWallClockTime: 3h6m - MaterializeTupleTime(*): 3m32s - ScannerThreadsSysTime: 48s120ms - ScannerThreadsUserTime: 4m59s - ScannerThreadsVoluntaryContextSwitches: 1.97M (1972275) - TotalRawHdfsReadTime(*): 31s383ms - TotalReadThroughput: 66.69 MB/sec Filter 4 (4.00 MB): - Rows processed: 4.82M (4816896) - Rows rejected: 0 (0) - Rows total: 2.00B (2002021102) Filter 5 (16.00 MB): - Rows processed: 4.82M (4816896) - Rows rejected: 0 (0) - Rows total: 2.00B (2002021102)
kudu:
HASH_JOIN_NODE (id=6):(Total: 6m29s, non-child: 6m12s, % non-child: 95.83%) ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously - BuildRows: 227.55M (227546432) - BuildTime: 53s604ms - NumHashTableBuildsSkipped: 0 (0) - PeakMemoryUsage: 11.63 GB (12491762944) - ProbeRows: 2.00B (2000049901) - ProbeRowsPartitioned: 0 (0) - ProbeTime: 5m15s - RowsReturned: 303.39M (303386833) {color:red} - RowsReturnedRate: 779.91 K/sec{color} Hash Join Builder (join_node_id=6): ExecOption: Build Side Codegen Enabled, Hash Table Construction Codegen Enabled Runtime filters: 0 of 1 Runtime Filter Published, 1 Disabled - BuildRowsPartitionTime: 35s049ms - BuildRowsPartitioned: 227.55M (227546432) - GetNewBlockTime: 13.744ms - HashBuckets: 536.87M (536870912) - HashCollisions: 0 (0) - HashTablesBuildTime: 18s507ms - LargestPartitionPercent: 6 (6) - MaxPartitionLevel: 0 (0) - NumRepartitions: 0 (0) - PartitionsCreated: 16 (16) - PeakMemoryUsage: 11.63 GB (12491707520) - PinTime: 0.000ns - RepartitionTime: 0.000ns - SpilledPartitions: 0 (0) - UnpinTime: 0.000ns EXCHANGE_NODE (id=13):(Total: 7s265ms, non-child: 6s330ms, % non-child: 87.14%) BytesReceived(16s000ms): 525.45 MB, 1.71 GB, 2.86 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB, 3.15 GB - BytesReceived: 3.15 GB (3383413889) - ConvertRowBatchTime: 4s846ms - DeserializeRowBatchTimer: 8s021ms - FirstBatchArrivalWaitTime: 0.000ns - PeakMemoryUsage: 0 - RowsReturned: 227.55M (227546432) - RowsReturnedRate: 31.32 M/sec - SendersBlockedTimer: 11s284ms - SendersBlockedTotalTimer(*): 22s888ms KUDU_SCAN_NODE (id=2):(Total: 8s961ms, non-child: 8s961ms, % non-child: 100.00%) Runtime filters: All filters arrived. Waited 0 BytesRead(16s000ms): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 - BytesRead: 0 - KuduRemoteScanTokens: 0 (0) - NumScannerThreadsStarted: 40 (40) - PeakMemoryUsage: 5.65 MB (5919744) {color:red} - RowBatchQueueGetWaitTime: 0.000ns - RowBatchQueuePutWaitTime: 5h53m{color} - RowsRead: 2.00B (2000049901) - RowsReturned: 2.00B (2000049901) - RowsReturnedRate: 223.18 M/sec - ScanRangesComplete: 40 (40) - ScannerThreadsInvoluntaryContextSwitches: 2.25K (2251) - ScannerThreadsTotalWallClockTime: 6h2m - MaterializeTupleTime(*): 8s754ms - ScannerThreadsSysTime: 56s576ms - ScannerThreadsUserTime: 2m8s - ScannerThreadsVoluntaryContextSwitches: 1.99M (1993935) - TotalKuduScanRoundTrips: 7.40K (7403) - TotalReadThroughput: 0.00 /sec Filter 4 (4.00 MB): - Rows processed: 0 (0) - Rows rejected: 0 (0) - Rows total: 0 (0) Filter 5 (16.00 MB): - Rows processed: 0 (0) - Rows rejected: 0 (0) - Rows total: 0 (0)
helifu the RowBatchQueuePutWaitTime could indicate that the queue was full, which would mean that the Kudu scanners were adding data to the queue faster than the consumer could remove it, or it could indicate that the CPU is oversubscribed and the Kudu scanners are context-switching when blocked on the queue.
Thanks Tim. your reply is correct. maybe i will try to dig on https://issues.apache.org/jira/browse/IMPALA-2797
helifu, what you are seeing is an indication that the scan node was producing row batches faster than the consumer of the scan node (which seems to be the probe side of the PHJ node if I read the profile correctly) could consume them. In particular, the probe time in the HashJoin node differs a lot in these two profiles, which is consistent with the observation that the amount of time scanner threads in the Kudu scan node blocked waiting for insert into the row batch queue is much larger than that in the Hdfs scan node. While one may argue that IMPALA-2797 will help by throttling the number of active scanner threads so the main execution thread can get more CPU time, I am not entirely convinced this is the problem here because the Hdfs scan node has the same level of imbalance and its consumer doesn't suffer as much unless Kudu scan node's tuple materialization is much more CPU intensive which doesn't seem to be the case according to ScannerThreadsUserTime or MaterializeTupleTime. So, there seems to be something else wrong here with the PHJ probing.
Another key thing to note is that once the row batch queue in the scan node fills up, it's already implicitly throttling all scanner threads so the main execution thread shouldn't be drowned out unless there is something else running on the host at that time.
Longer term, the imbalance between the scanner threads and the main execution thread should go away once we move over to multi-threading (IMPALA-3902).
HASH_JOIN_NODE (id=6):(Total: 3m41s, non-child: 3m24s, % non-child: 92.34%) ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously - BuildRows: 227.55M (227546432) - BuildTime: 1m15s - NumHashTableBuildsSkipped: 0 (0) - PeakMemoryUsage: 17.14 GB (18399351552) - ProbeRows: 2.00B (2002021102) - ProbeRowsPartitioned: 0 (0) - ProbeTime: 2m5s <<<-----
vs
HASH_JOIN_NODE (id=6):(Total: 6m29s, non-child: 6m12s, % non-child: 95.83%) ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously - BuildRows: 227.55M (227546432) - BuildTime: 53s604ms - NumHashTableBuildsSkipped: 0 (0) - PeakMemoryUsage: 11.63 GB (12491762944) - ProbeRows: 2.00B (2000049901) - ProbeRowsPartitioned: 0 (0) - ProbeTime: 5m15s <<------
We recently saw a case where a minor change to an earlier version of the code gave a big speedup:
http://gerrit.cloudera.org:8080/#/c/741/