Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-5910

Data stream sender timeout causes query hang when 0 rows sent through channel

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Duplicate
    • Affects Version/s: Impala 2.5.0, Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0
    • Fix Version/s: None
    • Component/s: Distributed Exec
    • Labels:

      Description

      We saw this bug on an internal cluster that was under heavy load and having trouble opening connections. It manifested as a query hang.

      Here are the fragments:

      Host	Num. instances	Num. remaining instances	Done	Peak mem. consumption	Time since last report (ms)
      h40:22000	1	1	false	8192	3369
      h30:22000	2	0	true	112573520	34000630
      h24:22000	2	1	false	96816946	3371
      

      The thread is hung waiting on the exchange:

      https://h24:25000/thread-group?group=fragment-execution
      Thread name	Id	Cumulative User CPU(s)	Cumulative Kernel CPU(s)	Cumulative IO-wait(s)
      profile-report (finst:641169ef2c0c8ea:8edf87e100000003) 	31426 	1.92 	0.56 	0
      exec-finstance (finst:641169ef2c0c8ea:8edf87e100000003) 	31420 	0.14 	0 	0
      
      Pstack output:
      Thread 1054 (Thread 0x7fbe693cd700 (LWP 31420)):
      #0  0x0000003fdd40b5bc in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
      #1  0x0000000000820a73 in boost::condition_variable::wait(boost::unique_lock<boost::mutex>&) ()
      #2  0x0000000000a1cd01 in impala::DataStreamRecvr::SenderQueue::GetBatch(impala::RowBatch**) ()
      #3  0x0000000000a1d050 in impala::DataStreamRecvr::GetBatch(impala::RowBatch**) ()
      #4  0x0000000000c3247d in impala::ExchangeNode::FillInputRowBatch(impala::RuntimeState*) ()
      #5  0x0000000000c32e61 in impala::ExchangeNode::Open(impala::RuntimeState*) ()
      #6  0x0000000000ccd505 in impala::PartitionedAggregationNode::Open(impala::RuntimeState*) ()
      #7  0x0000000000a50f8d in impala::FragmentInstanceState::Open() ()
      #8  0x0000000000a526fb in impala::FragmentInstanceState::Exec() ()
      #9  0x0000000000a2f158 in impala::QueryState::ExecFInstance(impala::FragmentInstanceState*) ()
      #10 0x0000000000bd3a72 in impala::Thread::SuperviseThread(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::Promise<long>*) ()
      #11 0x0000000000bd41d4 in boost::detail::thread_data<boost::_bi::bind_t<void, void (*)(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, boost::function<void ()()>, impala::Promise<long>*), boost::_bi::list4<boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, boost::_bi::value<boost::function<void ()()> >, boost::_bi::value<impala::Promise<long>*> > > >::run() ()
      #12 0x0000000000e5fb9a in ?? ()
      

      The plan is

      F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
      Per-Host Resources: mem-estimate=240.00MB mem-reservation=34.00MB
      01:AGGREGATE [STREAMING]
      |  group by: c1, c2, concat(CAST(c3 AS STRING), ';', c4)
      |  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB
      |  tuple-ids=1 row-size=46B cardinality=unavailable
      |
      00:SCAN HDFS [d.t, RANDOM]
         partitions=1/3000 files=2 size=82.09MB
         predicates: identifier = 'xxxxxxxxxxx'
         stats-rows=unavailable extrapolated-rows=disabled
         table stats: rows=unavailable size=unavailable
         columns missing stats: ....
         parquet dictionary predicates: identifier = 'xxxxxxxxxxxx'
         mem-estimate=112.00MB mem-reservation=0B
         tuple-ids=0 row-size=45B cardinality=unavailable
      

      I looked at the profile and saw that all the scans produced 0 rows.

      I saw in one log that there was a datastream sender timeout

      h24-impalad.INFO:I0906 11:55:25.695996  2894 data-stream-mgr.cc:130] Datastream sender timed-out waiting for recvr for fragment instance: 641169ef2c0c8ea:8edf87e100000003 (time-out was: 2m). Increase --datastream_sender_timeout_ms if you see this message frequently.
      

      It looks like all non-hung instances for that query that finished with an OK status, which means that FIS::Exec() returned OK.

      h24-impalad.INFO:I0906 11:56:41.212296 31421 query-state.cc:353] Instance completed. instance_id=641169ef2c0c8ea:8edf87e100000001 #in-flight=613 status=OK
      h30-impalad.INFO:I0906 11:55:26.268643  4850 query-state.cc:353] Instance completed. instance_id=641169ef2c0c8ea:8edf87e100000002 #in-flight=137 status=OK
      h30-impalad.INFO:I0906 11:56:41.213317  4848 query-state.cc:353] Instance completed. instance_id=641169ef2c0c8ea:8edf87e100000004 #in-flight=121 status=OK
      

      Tracing from this OK status to the data stream sender timeout, we get

      • FIS::Exec() should fail with an error from DataStreamSender::Send() or DataStreamSender::FlushFinal() if there's a timeout.
      • For this query, Send() is not applicable because the fragment returned 0 rows.
      • Therefore DataStreamSender::FlushFinal() should have returned an error.
      • This happens when Channel::FlushandSendEos() does not return an error. Therefore none of those flush calls returned an error.
      • FlushAndSendEos() calls DoTransmitDataRpc() as below.
          rpc_status_ = DoTransmitDataRpc(&client, params, &res);
          if (!rpc_status_.ok()) {
            return Status(rpc_status_.code(),
               Substitute("TransmitData(eos=true) to $0 failed:\n $1",
                TNetworkAddressToString(address_), rpc_status_.msg().msg()));
          }
          return Status(res.status);
        
      • rpc_status_ was OK because the status message doesn't show up in the logs

      Therefore we have two scenarios:

      1. rpc_status_ was OK in spite of the RPC failing
      2. res contained a non-ok status in spite of the send timing out.

      The bug appears to stem from the fact that if no rows are sent over the connection, then AddData() is never called, only CloseSender().

      void ImpalaServer::TransmitData(
          TTransmitDataResult& return_val, const TTransmitDataParams& params) {
        VLOG_ROW << "TransmitData(): instance_id=" << params.dest_fragment_instance_id
                 << " node_id=" << params.dest_node_id
                 << " #rows=" << params.row_batch.num_rows
                 << " sender_id=" << params.sender_id
                 << " eos=" << (params.eos ? "true" : "false");
        // TODO: fix Thrift so we can simply take ownership of thrift_batch instead
        // of having to copy its data
        if (params.row_batch.num_rows > 0) {
          Status status = exec_env_->ThriftStreamMgr()->AddData(
              params.dest_fragment_instance_id, params.dest_node_id, params.row_batch,
              params.sender_id);
          status.SetTStatus(&return_val);
          if (!status.ok()) {
            // should we close the channel here as well?
            return;
          }
        }
      
        if (params.eos) {
          exec_env_->stream_mgr()->CloseSender(
              params.dest_fragment_instance_id, params.dest_node_id,
              params.sender_id).SetTStatus(&return_val);
        }
      }
      

      CloseSender() doesn't return an error if it can't find the receiver, which means that there is never a status propagated back to the caller. That would explain why the timeout was logged but no fragments failed.

      Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id,
          PlanNodeId dest_node_id, int sender_id) {
        VLOG_FILE << "CloseSender(): fragment_instance_id=" << fragment_instance_id
                  << ", node=" << dest_node_id;
        bool unused;
        shared_ptr<DataStreamRecvr> recvr = FindRecvrOrWait(fragment_instance_id, dest_node_id,
            &unused);
        if (recvr.get() != NULL) recvr->RemoveSender(sender_id);
          
        {
          // Remove any closed streams that have been in the cache for more than
          // STREAM_EXPIRATION_TIME_MS.
          lock_guard<mutex> l(lock_);
          ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
          int64_t now = MonotonicMillis();
          int32_t before = closed_stream_cache_.size(); 
          while (it != closed_stream_expirations_.end() && it->first < now) {
            closed_stream_cache_.erase(it->second);
            closed_stream_expirations_.erase(it++);
          }
          DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
          int32_t after = closed_stream_cache_.size();
          if (before != after) {
            VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " << after
                       << ", eviction took: "
                       << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
          }
        }
        return Status::OK();
      }   
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                tarmstrong Tim Armstrong
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: