Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-3160

Queries may not get cancelled if cancellation pool hits MAX_CANCELLATION_QUEUE_SIZE

    XMLWordPrintableJSON

    Details

      Description

      The ImpalaServer::MembershipCallback() function determines if a backend(s) is down from the topic updates from the statestore. It also cancels all the queries that are already in flight on these failed backends after comparing the failed backend from the topic update to the failed backend in the query_locations_ map which maps backends to queries running on it.

      If the cancellation queue is too large (tracked by MAX_CANCELLATION_QUEUE_SIZE), we do not cancel the queries hoping that by the next heartbeat, the cancellation queue frees up so we can re-try the cancellation of these queries.

      However, by that point we already remove the failed backend from the query_locations_ map. So, the next heartbeat will never find this backend to cancel the queries running on it.

          // Maps from query id (to be cancelled) to a list of failed Impalads that are
          // the cause of the cancellation.
          map<TUniqueId, vector<TNetworkAddress> > queries_to_cancel; // VVVV: LOCAL MAP
          {
            // Build a list of queries that are running on failed hosts (as evidenced by their
            // absence from the membership list).
            // TODO: crash-restart failures can give false negatives for failed Impala demons.
            lock_guard<mutex> l(query_locations_lock_);
            QueryLocations::const_iterator loc_entry = query_locations_.begin();
            while (loc_entry != query_locations_.end()) {
              if (current_membership.find(loc_entry->first) == current_membership.end()) {
                unordered_set<TUniqueId>::const_iterator query_id = loc_entry->second.begin();
                // Add failed backend locations to all queries that ran on that backend.
                for(; query_id != loc_entry->second.end(); ++query_id) {
                  vector<TNetworkAddress>& failed_hosts = queries_to_cancel[*query_id];
                  failed_hosts.push_back(loc_entry->first);
                }
                exec_env_->impalad_client_cache()->CloseConnections(loc_entry->first);
                // We can remove the location wholesale once we know backend's failed. To do so
                // safely during iteration, we have to be careful not in invalidate the current
                // iterator, so copy the iterator to do the erase(..) and advance the original.
                QueryLocations::const_iterator failed_backend = loc_entry;
                ++loc_entry;
      
                // VVVV: WE ERASE THE ENTRY FROM THE GLOBAL MAP HERE.
                query_locations_.erase(failed_backend);
              } else {
                ++loc_entry;
              }
            }
          }
      
          if (cancellation_thread_pool_->GetQueueSize() + queries_to_cancel.size() >
              MAX_CANCELLATION_QUEUE_SIZE) {
            // Ignore the cancellations - we'll be able to process them on the next heartbeat
            // instead.
            LOG_EVERY_N(WARNING, 60) << "Cancellation queue is full";
      
            // VVVV: WE DON'T CANCEL HERE AND BY THE NEXT HEARTBEAT, WE WON'T FIND THE FAILED BACKEND AGAIN.
          }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                sailesh Sailesh Mukil
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated: