Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-8239

Check failed: deferred_rpcs_.empty() || (num_deserialize_tasks_pending_ + num_pending_enqueue_) > 0

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • Impala 3.2.0
    • Impala 3.2.0
    • Backend

    Description

      F0221 12:36:27.886497 157443 krpc-data-stream-recvr.cc:234] 3843237c92f4c6b5:b5aa170b00000092] Check failed: deferred_rpcs_.empty() || (num_deserialize_tasks_pending_ + num_pending_enqueue_) > 0 
      *** Check failure stack trace: ***
          @          0x47eb92c  google::LogMessage::Fail()
          @          0x47ed1d1  google::LogMessage::SendToLog()
          @          0x47eb306  google::LogMessage::Flush()
          @          0x47ee8cd  google::LogMessageFatal::~LogMessageFatal()
          @          0x1ed07a0  impala::KrpcDataStreamRecvr::SenderQueue::GetBatch()
          @          0x1ed65e7  impala::KrpcDataStreamRecvr::GetBatch()
          @          0x22c5d67  impala::ExchangeNode::FillInputRowBatch()
          @          0x22c5953  impala::ExchangeNode::Open()
          @          0x240079a  impala::BlockingJoinNode::ProcessBuildInputAndOpenProbe()
          @          0x238720f  impala::PartitionedHashJoinNode::Open()
          @          0x23e6d76  impala::AggregationNode::Open()
          @          0x1f52b2f  impala::FragmentInstanceState::Open()
          @          0x1f4f7f3  impala::FragmentInstanceState::Exec()
          @          0x1f62dd9  impala::QueryState::ExecFInstance()
          @          0x1f610bb  _ZZN6impala10QueryState15StartFInstancesEvENKUlvE_clEv
          @          0x1f6421a  _ZN5boost6detail8function26void_function_obj_invoker0IZN6impala10QueryState15StartFInstancesEvEUlvE_vE6invokeERNS1_15function_bufferE
          @          0x1d766b3  boost::function0<>::operator()()
          @          0x2223e4e  impala::Thread::SuperviseThread()
          @          0x222c1d2  boost::_bi::list5<>::operator()<>()
          @          0x222c0f6  boost::_bi::bind_t<>::operator()()
          @          0x222c0b9  boost::detail::thread_data<>::run()
          @          0x3715469  thread_proxy
          @     0x7f262e1efe24  start_thread
          @     0x7f262df1d34c  __clone
      

      Immediately before it in the log a different thread printed:

      E0221 12:36:27.557099 157280 krpc-data-stream-sender.cc:344] 4145938c29f31877:6ed16e5e0000022d] channel send to 10.17.211.20:27000 failed: (fragment_instance_id=4145938c29f31877:6ed16e5e0000024c): Memory limit exceeded: Failed to allocate row batch
      EXCHANGE_NODE (id=189) could not allocate 8.00 KB without exceeding limit.
      Error occurred on backend vc0510.halxg.cloudera.com:22000
      Memory left in process limit: 12.11 GB
      Memory left in query limit: 541.65 MB
      Query(4145938c29f31877:6ed16e5e00000000): Limit=1.39 GB Reservation=704.88 MB ReservationLimit=1.11 GB OtherMemory=174.47 MB Total=879.35 MB Peak=926.55 MB
        Unclaimed reservations: Reservation=430.88 MB OtherMemory=0 Total=430.88 MB Peak=740.31 MB
        <fragment instance>: Reservation=0 OtherMemory=0 Total=0 Peak=1.59 MB
          SORT_NODE (id=118): Total=0 Peak=4.00 KB
          AGGREGATION_NODE (id=203): Reservation=0 OtherMemory=0 Total=0 Peak=81.12 KB
            GroupingAggregator 0: Total=0 Peak=81.12 KB
          EXCHANGE_NODE (id=202): Reservation=0 OtherMemory=0 Total=0 Peak=0
            KrpcDeferredRpcs: Total=0 Peak=0
          KrpcDataStreamSender (dst_id=204): Total=0 Peak=1.75 KB
          CodeGen: Total=0 Peak=1.50 MB
        <fragment instance>: Reservation=0 OtherMemory=0 Total=0 Peak=2.14 MB
          AGGREGATION_NODE (id=117): Reservation=0 OtherMemory=0 Total=0 Peak=81.12 KB
            GroupingAggregator 0: Total=0 Peak=81.12 KB
          UNION_NODE (id=0): Total=0 Peak=0
          AGGREGATION_NODE (id=144): Reservation=0 OtherMemory=0 Total=0 Peak=34.12 KB
            GroupingAggregator 0: Total=0 Peak=34.12 KB
      

      Logs for the crashing thread:

      I0221 12:36:23.677255 157443 query-state.cc:624] 3843237c92f4c6b5:b5aa170b00000092] Executing instance. instance_id=3843237c92f4c6b5:b5aa170b00000092 fragment_idx=10 per_fragment_instance_idx=7 coord_state_idx=10
       #in-flight=525
      

      The query was:

      I0221 12:36:18.419795 140378 impala-server.cc:1063] 3843237c92f4c6b5:b5aa170b00000000] Registered query query_id=3843237c92f4c6b5:b5aa170b00000000 session_id=f24bd763b83e0b5e:77f6ea4b07165d9b
      I0221 12:36:18.420265 140378 Frontend.java:1245] 3843237c92f4c6b5:b5aa170b00000000] Analyzing query: /* Mem: 218 MB. Coordinator: vc0515.halxg.cloudera.com. */
      -- RESULT MISMATCH FROM ORIGINAL in DECIMAL value
      -- FIXED. CAST RESULT QUOTIENT TO DECIMAL(15, 4), TAKE ACTUAL RESULT AS EXPECTED
      select promotions,total,cast(promotions*100.00/total as decimal(15,4))
      from
        (select sum(ss_ext_sales_price) promotions
         from  store_sales
              ,store
              ,promotion
              ,date_dim
              ,customer
              ,customer_address
              ,item
         where ss_sold_date_sk = d_date_sk
         and   ss_store_sk = s_store_sk
         and   ss_promo_sk = p_promo_sk
         and   ss_customer_sk= c_customer_sk
         and   ca_address_sk = c_current_addr_sk
         and   ss_item_sk = i_item_sk
         and   ca_gmt_offset = -5
         and   i_category = 'Jewelry'
         and   (p_channel_dmail = 'Y' or p_channel_email = 'Y' or p_channel_tv = 'Y')
         and   s_gmt_offset = -5
         and   d_year = 1998
         and   d_moy  = 1) promotional_sales,
        (select sum(ss_ext_sales_price) total
         from  store_sales
              ,store
              ,date_dim
              ,customer
              ,customer_address
              ,item
         where ss_sold_date_sk = d_date_sk
         and   ss_store_sk = s_store_sk
         and   ss_customer_sk= c_customer_sk
         and   ca_address_sk = c_current_addr_sk
         and   ss_item_sk = i_item_sk
         and   ca_gmt_offset = -5
         and   i_category = 'Jewelry'
         and   s_gmt_offset = -5
         and   d_year = 1998
         and   d_moy  = 1) all_sales
      order by promotions, total
      

      Logs of the crashing Impala and coordinator are in IMPALA-8239-logs.tar.gz

      Attachments

        1. IMPALA-8239-logs.tar.gz
          6.83 MB
          Tim Armstrong

        Issue Links

          Activity

            kwho Michael Ho added a comment - - edited

            It seems that there is a race window when KrpcDataStreamRecvr::SenderQueue::AddBatchWork() hits an error when deserializing a row batch. In particular, if the call comes from the deserialization thread for inserting a deferred row batch, we will end up having num_deserialize_tasks_pending_ == 0 and num_pending_enqueue_ == 0 right after returning from KrpcDataStreamRecvr::SenderQueue::AddBatchWork(). In addition, batch_queue_ will be empty. If a thread calls into KrpcDataStreamRecvr::SenderQueue::AddBatchWork() at this time, it will hit the DCHECK. The expectation is that the query will be cancelled after propagating the error back to the sender. However, in the meantime, the thread which calls KrpcDataStreamRecvr::SenderQueue::AddBatchWork() may block even if there may still be entries in deferred_rpcs_ queue. We probably need a better handling of this error case in KrpcDataStreamRecvr::SenderQueue::AddBatchWork()

              DCHECK_GT(num_pending_enqueue_, 0);
              --num_pending_enqueue_;
              if (UNLIKELY(!status.ok())) {
                recvr_->num_buffered_bytes_.Add(-batch_size);
                data_arrival_cv_.notify_one();
                return status; <<----
              }
            
            kwho Michael Ho added a comment - - edited It seems that there is a race window when KrpcDataStreamRecvr::SenderQueue::AddBatchWork() hits an error when deserializing a row batch. In particular, if the call comes from the deserialization thread for inserting a deferred row batch, we will end up having num_deserialize_tasks_pending_ == 0 and num_pending_enqueue_ == 0 right after returning from KrpcDataStreamRecvr::SenderQueue::AddBatchWork() . In addition, batch_queue_ will be empty. If a thread calls into KrpcDataStreamRecvr::SenderQueue::AddBatchWork() at this time, it will hit the DCHECK. The expectation is that the query will be cancelled after propagating the error back to the sender. However, in the meantime, the thread which calls KrpcDataStreamRecvr::SenderQueue::AddBatchWork() may block even if there may still be entries in deferred_rpcs_ queue. We probably need a better handling of this error case in KrpcDataStreamRecvr::SenderQueue::AddBatchWork() DCHECK_GT(num_pending_enqueue_, 0); --num_pending_enqueue_; if (UNLIKELY(!status.ok())) { recvr_->num_buffered_bytes_.Add(-batch_size); data_arrival_cv_.notify_one(); return status; <<---- }

            Commit 1718f9c07bfac2e14d0d9a2de052dea8ac65f45f in impala's branch refs/heads/master from Michael Ho
            [ https://gitbox.apache.org/repos/asf?p=impala.git;h=1718f9c ]

            IMPALA-8239: Fix handling of failed deserialization of row batch

            Previously, when a row batch failed to be deserialized in the
            data stream receiver, we will return the error status to the
            sender of the row batch without inserting the row batch. The
            receiver will continue to operate without flagging any error.
            The assumption is that the sender will eventually cancel the
            query upon receiving the failed status.

            Normally, when a caller of GetBatch() successfully dequeues a row
            batch from the batch queue, it will kick off the draining of the
            row batches from the deferred queue into the normal batch queue,
            which will further continue the cycle of draining the deferred queue
            upon the next call to GetBatch() until the deferred queue becomes empty.

            When an error is hit when deserializing a deferred batch to be inserted
            into the batch queue, the existing code will simply not insert the row
            batch or flag any error. This breaks the cycle of the deferred queue
            draining as the batch queue may become empty forever. The caller of
            GetBatch() will block indefinitely until the query is cancelled.

            The existing code works fine as the expectation is that the query will
            be cancelled once the sender receives the error status from the RPC
            response. However, this behavior is still not ideal as it lets a query
            which has hit a fatal error to hold on to resources for extended period
            of time.

            This patch fixes the problem by explicitly recording any error during
            row batch insertion in an error status object. Callers of GetBatch()
            will now also poll for this status object while waiting for row batch
            to show up and bail out early if there is any error.

            A new test case has been added to simulate the problematic case above.

            Change-Id: Iaa74937b046d95484887533be548249e96078755
            Reviewed-on: http://gerrit.cloudera.org:8080/12567
            Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
            Reviewed-by: Thomas Marshall <tmarshall@cloudera.com>
            Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>

            jira-bot ASF subversion and git services added a comment - Commit 1718f9c07bfac2e14d0d9a2de052dea8ac65f45f in impala's branch refs/heads/master from Michael Ho [ https://gitbox.apache.org/repos/asf?p=impala.git;h=1718f9c ] IMPALA-8239 : Fix handling of failed deserialization of row batch Previously, when a row batch failed to be deserialized in the data stream receiver, we will return the error status to the sender of the row batch without inserting the row batch. The receiver will continue to operate without flagging any error. The assumption is that the sender will eventually cancel the query upon receiving the failed status. Normally, when a caller of GetBatch() successfully dequeues a row batch from the batch queue, it will kick off the draining of the row batches from the deferred queue into the normal batch queue, which will further continue the cycle of draining the deferred queue upon the next call to GetBatch() until the deferred queue becomes empty. When an error is hit when deserializing a deferred batch to be inserted into the batch queue, the existing code will simply not insert the row batch or flag any error. This breaks the cycle of the deferred queue draining as the batch queue may become empty forever. The caller of GetBatch() will block indefinitely until the query is cancelled. The existing code works fine as the expectation is that the query will be cancelled once the sender receives the error status from the RPC response. However, this behavior is still not ideal as it lets a query which has hit a fatal error to hold on to resources for extended period of time. This patch fixes the problem by explicitly recording any error during row batch insertion in an error status object. Callers of GetBatch() will now also poll for this status object while waiting for row batch to show up and bail out early if there is any error. A new test case has been added to simulate the problematic case above. Change-Id: Iaa74937b046d95484887533be548249e96078755 Reviewed-on: http://gerrit.cloudera.org:8080/12567 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Reviewed-by: Thomas Marshall <tmarshall@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>

            People

              kwho Michael Ho
              tarmstrong Tim Armstrong
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: