Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-7301 Resource management for exchange node and datastream sender
  3. IMPALA-6692

When partition exchange is followed by sort each sort node becomes a synchronization point across the cluster

    XMLWordPrintableJSON

Details

    Description

      Issue described in this JIRA applies to

      • Analytical functions
      • Writes to Partitioned Parquet tables
      • Writes to Kudu tables

      When inserting into a Kudu table from Impala the plan is something like HDFS SCAN -> Partition Exchange -> Partial Sort -> Kudu Insert.

      The query initially makes good progress then significantly slows down and very few nodes make progress.

      While the insert is running the query goes through different phases

      • Phase 1
        • Scan is reading data fast, sending data through to exchange
        • Partial Sort keeps accumulating batches
        • Network and CPU is busy, life appears to be OK
      • Phase 2
        • One of the Sort operators reaches its memory limit and stops calling ExchangeNode::GetNext for a while
        • This creates back pressure against the DataStreamSenders
        • The Partial Sort doesn't call GetNext until it has finished sorting GBs of data (Partial sort memory is unbounded as of 03/16/2018)
        • All exchange operators in the cluster eventually get blocked on that Sort operator and can no longer make progress
        • After a while the Sort is able to accept more batches which temporarily unblocks execution across the cluster
        • Another sort operator reaches its memory limit and this loop repeats itself

      Below are stacks from one of the blocked hosts

      Sort node waiting on data from exchange node as it didn't start sorting since the memory limit for the sort wasn't reached

      Thread 90 (Thread 0x7f8d7d233700 (LWP 21625)):
      #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
      #1  0x00007fab1422174c in std::condition_variable::wait(std::unique_lock<std::mutex>&) () from /opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.205/lib/impala/lib/libstdc++.so.6
      #2  0x0000000000b4d5aa in void std::_V2::condition_variable_any::wait<boost::unique_lock<impala::SpinLock> >(boost::unique_lock<impala::SpinLock>&) ()
      #3  0x0000000000b4ab6a in impala::KrpcDataStreamRecvr::SenderQueue::GetBatch(impala::RowBatch**) ()
      #4  0x0000000000b4b0c8 in impala::KrpcDataStreamRecvr::GetBatch(impala::RowBatch**) ()
      #5  0x0000000000dca7c5 in impala::ExchangeNode::FillInputRowBatch(impala::RuntimeState*) ()
      #6  0x0000000000dcacae in impala::ExchangeNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) ()
      #7  0x0000000001032ac3 in impala::PartialSortNode::GetNext(impala::RuntimeState*, impala::RowBatch*, bool*) ()
      #8  0x0000000000ba9c92 in impala::FragmentInstanceState::ExecInternal() ()
      #9  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
      #10 0x0000000000b9ab1a in impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) ()
      #11 0x0000000000d5da9f in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*) ()
      #12 0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
      #13 0x00000000012d70ba in thread_proxy ()
      #14 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
      #15 0x0000003a6ece893d in clone () from /lib64/libc.so.6
      

      DataStreamSender blocked due to back pressure from the DataStreamRecvr on the node which has a Sort that is spilling

      Thread 89 (Thread 0x7fa8f6a15700 (LWP 21626)):
      #0  0x0000003a6f00ba5e in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
      #1  0x0000000001237e77 in impala::KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<impala::SpinLock>*) ()
      #2  0x0000000001238b8d in impala::KrpcDataStreamSender::Channel::TransmitData(impala::OutboundRowBatch const*) ()
      #3  0x0000000001238ca9 in impala::KrpcDataStreamSender::Channel::SerializeAndSendBatch(impala::RowBatch*) ()
      #4  0x0000000001238d2e in impala::KrpcDataStreamSender::Channel::SendCurrentBatch() ()
      #5  0x000000000123949f in impala::KrpcDataStreamSender::Send(impala::RuntimeState*, impala::RowBatch*) ()
      #6  0x0000000000ba9d47 in impala::FragmentInstanceState::ExecInternal() ()
      #7  0x0000000000bac7df in impala::FragmentInstanceState::Exec() ()
      

      Scan node blocked due to back pressure from the DataStreamSender

      Thread 68 (Thread 0x7fa929667700 (LWP 21648)):
      #0  0x0000003a6f00b68c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
      #1  0x0000000000dc9c60 in bool impala::BlockingQueue<std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> > >::BlockingPut<std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> > >(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >&&) ()
      #2  0x0000000000dc61e6 in impala::ExecNode::RowBatchQueue::AddBatch(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >) ()
      #3  0x0000000000dd1ca8 in impala::HdfsScanNode::AddMaterializedRowBatch(std::unique_ptr<impala::RowBatch, std::default_delete<impala::RowBatch> >) ()
      #4  0x0000000000e08adb in impala::HdfsParquetScanner::ProcessSplit() ()
      #5  0x0000000000dd219d in impala::HdfsScanNode::ProcessSplit(std::vector<impala::FilterContext, std::allocator<impala::FilterContext> > const&, impala::MemPool*, impala::io::ScanRange*) ()
      #6  0x0000000000dd3a12 in impala::HdfsScanNode::ScannerThread() ()
      #7  0x0000000000d5da9f in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*) ()
      #8  0x0000000000d5e29a in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::ThreadDebugInfo const*, impala::Promise<long>*), boost::_bi::list5<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::ThreadDebugInfo*>, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
      #9  0x00000000012d70ba in thread_proxy ()
      #10 0x0000003a6f007aa1 in start_thread () from /lib64/libpthread.so.0
      #11 0x0000003a6ece893d in clone () from /lib64/libc.so.6
      

      Attachments

        1. profile-spilling.txt
          11.35 MB
          Michael Ho
        2. Kudu table insert without KRPC no sort.txt
          78 kB
          Mostafa Mokhtar
        3. Kudu table insert without KRPC.txt
          82 kB
          Mostafa Mokhtar
        4. kudu_partial_sort_insert_vd1129.foo.com_2.txt
          6.35 MB
          Mostafa Mokhtar

        Issue Links

          Activity

            People

              rizaon Riza Suminto
              mmokhtar Mostafa Mokhtar
              Votes:
              0 Vote for this issue
              Watchers:
              21 Start watching this issue

              Dates

                Created:
                Updated: