Details
-
Sub-task
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
None
-
None
-
ghx-label-2
Description
While running scalability tests using KRPC I noticed that broadcast operations where the source file exists on one node are very slow, compared to without KRPC.
Further investigation showed that the RowBatch is serialized once per channel for KRPC wasting lots of CPU.
Baseline code
// current_thrift_batch_ is *not* the one that was written by the last call // to Serialize() RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, channels_.size())); // SendBatch() will block if there are still in-flight rpcs (and those will // reference the previously written thrift batch) for (int i = 0; i < channels_.size(); ++i) { RETURN_IF_ERROR(channels_[i]->SendBatch(current_thrift_batch_)); }
KRPC code
if (partition_type_ == TPartitionType::UNPARTITIONED || channels_.size() == 1) { // SendBatch() will block if there are still in-flight rpcs (and those will // reference the previously written thrift batch) for (int i = 0; i < channels_.size(); ++i) { RETURN_IF_ERROR(channels_[i]->SendBatch(batch)); } }
KRPC broadcast fragment
Averaged Fragment F01:(Total: 20s344ms, non-child: 494.783ms, % non-child: 2.43%) split sizes: min: 36.65 MB, max: 36.65 MB, avg: 36.65 MB, stddev: 0 completion times: min:20s353ms max:20s353ms mean: 20s353ms stddev:0.000ns execution rates: min:1.80 MB/sec max:1.80 MB/sec mean:1.80 MB/sec stddev:0.02 B/sec num instances: 1 - AverageThreadTokens: 1.70 - BloomFilterBytes: 0 - PeakMemoryUsage: 52.35 MB (54889274) - PeakReservation: 0 - PeakUsedReservation: 0 - PerHostPeakMemUsage: 175.82 MB (184360081) - RowsProduced: 402.00K (402000) - TotalNetworkReceiveTime: 0.000ns - TotalNetworkSendTime: 134.533ms - TotalStorageWaitTime: 18.921ms - TotalThreadsInvoluntaryContextSwitches: 2.18K (2177) - TotalThreadsTotalWallClockTime: 34s438ms - TotalThreadsSysTime: 247.962ms - TotalThreadsUserTime: 20s127ms - TotalThreadsVoluntaryContextSwitches: 410 (410) Fragment Instance Lifecycle Timings: - ExecTime: 20s313ms - ExecTreeExecTime: 44.718ms - OpenTime: 12.231ms - ExecTreeOpenTime: 28.946us - PrepareTime: 18.960ms - ExecTreePrepareTime: 126.983us KrpcDataStreamSender (dst_id=7):(Total: 19s776ms, non-child: 19s776ms, % non-child: 100.00%) - BytesSent: 3.63 GB (3902930720) - OverallThroughput: 188.21 MB/sec - PeakMemoryUsage: 405.23 KB (414960) - RowsReturned: 402.00K (402000) - RpcRetry: 0 (0) - SerializeBatchTime: 19s771ms - UncompressedRowBatchSize: 7.85 GB (8427946930)
Baseline fragment
Averaged Fragment F01:(Total: 3s440ms, non-child: 1s331ms, % non-child: 38.71%) split sizes: min: 36.65 MB, max: 36.65 MB, avg: 36.65 MB, stddev: 0 completion times: min:3s449ms max:3s449ms mean: 3s449ms stddev:0.000ns execution rates: min:10.62 MB/sec max:10.62 MB/sec mean:10.62 MB/sec stddev:0.70 B/sec num instances: 1 - AverageThreadTokens: 1.71 - BloomFilterBytes: 0 - PeakMemoryUsage: 52.35 MB (54889274) - PeakReservation: 0 - PeakUsedReservation: 0 - PerHostPeakMemUsage: 175.82 MB (184362391) - RowsProduced: 402.00K (402000) - TotalNetworkReceiveTime: 0.000ns - TotalNetworkSendTime: 2s538ms - TotalStorageWaitTime: 19.156ms - TotalThreadsInvoluntaryContextSwitches: 673 (673) - TotalThreadsTotalWallClockTime: 5s778ms - TotalThreadsSysTime: 265.959ms - TotalThreadsUserTime: 760.884ms - TotalThreadsVoluntaryContextSwitches: 3.10K (3104) Fragment Instance Lifecycle Timings: - ExecTime: 3s396ms - ExecTreeExecTime: 47.197ms - OpenTime: 13.312ms - ExecTreeOpenTime: 31.004us - PrepareTime: 31.097ms - ExecTreePrepareTime: 112.645us DataStreamSender (dst_id=7):(Total: 2s034ms, non-child: 2s034ms, % non-child: 100.00%) - BytesSent: 3.63 GB (3902930720) - NetworkThroughput(*): 2.40 GB/sec - OverallThroughput: 1.79 GB/sec - PeakMemoryUsage: 405.23 KB (414960) - RowsReturned: 402.00K (402000) - SerializeBatchTime: 507.673ms - TransmitDataRPCTime: 1s512ms - UncompressedRowBatchSize: 7.85 GB (8427946930)