Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-2567 KRPC milestone 1
  3. IMPALA-6041

RowBatch is serialized once per destination channel for Broadcast exchange

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • Not Applicable
    • Distributed Exec
    • None
    • ghx-label-2

    Description

      While running scalability tests using KRPC I noticed that broadcast operations where the source file exists on one node are very slow, compared to without KRPC.

      Further investigation showed that the RowBatch is serialized once per channel for KRPC wasting lots of CPU.

      Baseline code

         // current_thrift_batch_ is *not* the one that was written by the last call
          // to Serialize()
          RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, channels_.size()));
          // SendBatch() will block if there are still in-flight rpcs (and those will
          // reference the previously written thrift batch)
          for (int i = 0; i < channels_.size(); ++i) {
            RETURN_IF_ERROR(channels_[i]->SendBatch(current_thrift_batch_));
          }
      

      KRPC code

        if (partition_type_ == TPartitionType::UNPARTITIONED || channels_.size() == 1) {
          // SendBatch() will block if there are still in-flight rpcs (and those will
          // reference the previously written thrift batch)
          for (int i = 0; i < channels_.size(); ++i) {
            RETURN_IF_ERROR(channels_[i]->SendBatch(batch));
          }
        }
      

      KRPC broadcast fragment

      Averaged Fragment F01:(Total: 20s344ms, non-child: 494.783ms, % non-child: 2.43%)
            split sizes:  min: 36.65 MB, max: 36.65 MB, avg: 36.65 MB, stddev: 0
            completion times: min:20s353ms  max:20s353ms  mean: 20s353ms  stddev:0.000ns
            execution rates: min:1.80 MB/sec  max:1.80 MB/sec  mean:1.80 MB/sec  stddev:0.02 B/sec
            num instances: 1
             - AverageThreadTokens: 1.70 
             - BloomFilterBytes: 0
             - PeakMemoryUsage: 52.35 MB (54889274)
             - PeakReservation: 0
             - PeakUsedReservation: 0
             - PerHostPeakMemUsage: 175.82 MB (184360081)
             - RowsProduced: 402.00K (402000)
             - TotalNetworkReceiveTime: 0.000ns
             - TotalNetworkSendTime: 134.533ms
             - TotalStorageWaitTime: 18.921ms
             - TotalThreadsInvoluntaryContextSwitches: 2.18K (2177)
             - TotalThreadsTotalWallClockTime: 34s438ms
               - TotalThreadsSysTime: 247.962ms
               - TotalThreadsUserTime: 20s127ms
             - TotalThreadsVoluntaryContextSwitches: 410 (410)
            Fragment Instance Lifecycle Timings:
               - ExecTime: 20s313ms
                 - ExecTreeExecTime: 44.718ms
               - OpenTime: 12.231ms
                 - ExecTreeOpenTime: 28.946us
               - PrepareTime: 18.960ms
                 - ExecTreePrepareTime: 126.983us
            KrpcDataStreamSender (dst_id=7):(Total: 19s776ms, non-child: 19s776ms, % non-child: 100.00%)
               - BytesSent: 3.63 GB (3902930720)
               - OverallThroughput: 188.21 MB/sec
               - PeakMemoryUsage: 405.23 KB (414960)
               - RowsReturned: 402.00K (402000)
               - RpcRetry: 0 (0)
               - SerializeBatchTime: 19s771ms
               - UncompressedRowBatchSize: 7.85 GB (8427946930)
      

      Baseline fragment

      Averaged Fragment F01:(Total: 3s440ms, non-child: 1s331ms, % non-child: 38.71%)
            split sizes:  min: 36.65 MB, max: 36.65 MB, avg: 36.65 MB, stddev: 0
            completion times: min:3s449ms  max:3s449ms  mean: 3s449ms  stddev:0.000ns
            execution rates: min:10.62 MB/sec  max:10.62 MB/sec  mean:10.62 MB/sec  stddev:0.70 B/sec
            num instances: 1
             - AverageThreadTokens: 1.71 
             - BloomFilterBytes: 0
             - PeakMemoryUsage: 52.35 MB (54889274)
             - PeakReservation: 0
             - PeakUsedReservation: 0
             - PerHostPeakMemUsage: 175.82 MB (184362391)
             - RowsProduced: 402.00K (402000)
             - TotalNetworkReceiveTime: 0.000ns
             - TotalNetworkSendTime: 2s538ms
             - TotalStorageWaitTime: 19.156ms
             - TotalThreadsInvoluntaryContextSwitches: 673 (673)
             - TotalThreadsTotalWallClockTime: 5s778ms
               - TotalThreadsSysTime: 265.959ms
               - TotalThreadsUserTime: 760.884ms
             - TotalThreadsVoluntaryContextSwitches: 3.10K (3104)
            Fragment Instance Lifecycle Timings:
               - ExecTime: 3s396ms
                 - ExecTreeExecTime: 47.197ms
               - OpenTime: 13.312ms
                 - ExecTreeOpenTime: 31.004us
               - PrepareTime: 31.097ms
                 - ExecTreePrepareTime: 112.645us
            DataStreamSender (dst_id=7):(Total: 2s034ms, non-child: 2s034ms, % non-child: 100.00%)
               - BytesSent: 3.63 GB (3902930720)
               - NetworkThroughput(*): 2.40 GB/sec
               - OverallThroughput: 1.79 GB/sec
               - PeakMemoryUsage: 405.23 KB (414960)
               - RowsReturned: 402.00K (402000)
               - SerializeBatchTime: 507.673ms
               - TransmitDataRPCTime: 1s512ms
               - UncompressedRowBatchSize: 7.85 GB (8427946930)
      

      Attachments

        Activity

          People

            kwho Michael Ho
            mmokhtar Mostafa Mokhtar
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: