Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-3772

Failure in TestAdmissionControllerStress

    XMLWordPrintableJSON

    Details

      Description

      I ran into this when running an exhaustive build for an unrelated scanner change.

      http://sandbox.jenkins.cloudera.com/job/impala-private-build-and-test/3477/consoleFull

      09:23:57 ============================= test session starts ==============================
      09:23:57 platform linux2 -- Python 2.6.6 -- py-1.4.30 -- pytest-2.7.2
      09:23:57 rootdir: /data/jenkins/workspace/impala-private-build-and-test/repos, inifile: 
      09:23:57 plugins: random, xdist
      09:24:00 collected 88 items
      09:24:00 
      09:24:00 custom_cluster/test_admission_controller.py ........................F..............
      10:02:24 custom_cluster/test_alloc_fail.py ..
      10:04:06 custom_cluster/test_breakpad.py ......
      10:10:14 custom_cluster/test_delegation.py ...
      10:11:02 custom_cluster/test_exchange_delays.py ..
      10:12:31 custom_cluster/test_hdfs_fd_caching.py .
      10:13:07 custom_cluster/test_hive_parquet_timestamp_conversion.py ..
      10:14:00 custom_cluster/test_insert_behaviour.py ..
      10:15:04 custom_cluster/test_kudu_not_available.py .
      10:15:18 custom_cluster/test_legacy_joins_aggs.py .
      10:15:39 custom_cluster/test_parquet_max_page_header.py .
      10:16:43 custom_cluster/test_permanent_udfs.py ...
      10:32:47 custom_cluster/test_query_expiration.py ...
      10:33:48 custom_cluster/test_redaction.py ....
      10:35:55 custom_cluster/test_s3a_access.py s
      10:35:55 custom_cluster/test_scratch_disk.py ....
      10:38:22 custom_cluster/test_session_expiration.py .
      10:38:43 custom_cluster/test_spilling.py sss.
      10:50:01 authorization/test_authorization.py ..
      10:50:42 authorization/test_grant_revoke.py .
      10:52:23 ../../Impala-auxiliary-tests/tests/aux_custom_cluster_tests/test_ldap.py xxxxx
      10:52:23 
      10:52:23 =================================== FAILURES ===================================
      10:52:23  TestAdmissionControllerStress.test_admission_controller_with_configs[num_queries: 50 | submission_delay_ms: 0 | exec_option: {'disable_codegen': False, 'abort_on_error': 1, 'exec_single_node_rows_threshold': 0, 'batch_size': 0, 'num_nodes': 0} | table_format: text/none | round_robin_submission: True] 
      10:52:23 
      10:52:23 self = <test_admission_controller.TestAdmissionControllerStress object at 0x5645590>
      10:52:23 vector = <tests.common.test_vector.TestVector object at 0x5d8ba90>
      10:52:23 
      10:52:23     @pytest.mark.execute_serially
      10:52:23     @CustomClusterTestSuite.with_args(
      10:52:23         impalad_args=impalad_admission_ctrl_config_args(),
      10:52:23         statestored_args=_STATESTORED_ARGS)
      10:52:23     def test_admission_controller_with_configs(self, vector):
      10:52:23       self.pool_name = 'root.queueB'
      10:52:23 >     self.run_admission_test(vector, {'request_pool': self.pool_name})
      10:52:23 
      10:52:23 custom_cluster/test_admission_controller.py:681: 
      10:52:23 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      10:52:23 
      10:52:23 self = <test_admission_controller.TestAdmissionControllerStress object at 0x5645590>
      10:52:23 vector = <tests.common.test_vector.TestVector object at 0x5d8ba90>
      10:52:23 additional_query_options = {'request_pool': 'root.queueB'}
      10:52:23 
      10:52:23     def run_admission_test(self, vector, additional_query_options):
      10:52:23       LOG.debug("Starting test case with parameters: %s", vector)
      10:52:23       self.impalads = self.cluster.impalads
      10:52:23       round_robin_submission = vector.get_value('round_robin_submission')
      10:52:23       submission_delay_ms = vector.get_value('submission_delay_ms')
      10:52:23       if not round_robin_submission:
      10:52:23         self.impalads = [self.impalads[0]]
      10:52:23     
      10:52:23       num_queries = vector.get_value('num_queries')
      10:52:23       assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
      10:52:23       initial_metrics = self.get_admission_metrics();
      10:52:23       log_metrics("Initial metrics: ", initial_metrics);
      10:52:23     
      10:52:23       # Want query_num to start at 1 because this gets used as the limit in the query to
      10:52:23       # help debugging (we can associate a running query with a thread). If we start at 0,
      10:52:23       # that query would be evaluated as a constant expression and never hit the WAIT debug
      10:52:23       # action.
      10:52:23       for query_num in xrange(1, num_queries + 1):
      10:52:23         impalad = self.impalads[query_num % len(self.impalads)]
      10:52:23         thread = self.SubmitQueryThread(impalad, additional_query_options, vector,
      10:52:23             query_num, self.executing_threads)
      10:52:23         thread.start()
      10:52:23         self.all_threads.append(thread)
      10:52:23         sleep(submission_delay_ms / 1000.0)
      10:52:23     
      10:52:23       # Wait for all of the queries to be admitted, queued, or rejected (as reported
      10:52:23       # by the impalad metrics).
      10:52:23       LOG.debug("Wait for initial admission decisions")
      10:52:23       (metric_deltas, curr_metrics) = self.wait_for_metric_changes(\
      10:52:23           ['admitted', 'queued', 'rejected'], initial_metrics, num_queries)
      10:52:23       # Also wait for the threads that submitted the queries to start executing
      10:52:23       self.wait_for_admitted_threads(metric_deltas['admitted'])
      10:52:23     
      10:52:23       # Check that the admission decisions are reasonable given the test parameters
      10:52:23       # The number of admitted and queued requests should be at least the configured limits
      10:52:23       # but less than or equal to those limits times the number of impalads.
      10:52:23       assert metric_deltas['admitted'] >= MAX_NUM_CONCURRENT_QUERIES
      10:52:23       assert metric_deltas['admitted'] <= MAX_NUM_CONCURRENT_QUERIES * len(self.impalads)
      10:52:23       assert metric_deltas['queued'] >=\
      10:52:23           min(num_queries - metric_deltas['admitted'], MAX_NUM_QUEUED_QUERIES)
      10:52:23       assert metric_deltas['queued'] <= MAX_NUM_QUEUED_QUERIES * len(self.impalads)
      10:52:23       assert metric_deltas['rejected'] ==\
      10:52:23           num_queries - metric_deltas['admitted'] - metric_deltas['queued']
      10:52:23       initial_metric_deltas = metric_deltas
      10:52:23     
      10:52:23       while len(self.executing_threads) > 0:
      10:52:23         curr_metrics = self.get_admission_metrics();
      10:52:23         log_metrics("Main loop, curr_metrics: ", curr_metrics);
      10:52:23         num_to_cancel = len(self.executing_threads)
      10:52:23         LOG.debug("Main loop, will cancel %s queries", num_to_cancel)
      10:52:23         self.cancel_admitted_queries(num_to_cancel)
      10:52:23         self.wait_for_metric_changes(['released'], curr_metrics, num_to_cancel)
      10:52:23     
      10:52:23         num_queued_remaining =\
      10:52:23             curr_metrics['queued'] - curr_metrics['dequeued'] - curr_metrics['timed-out']
      10:52:23         expected_admitted = min(num_queued_remaining, MAX_NUM_CONCURRENT_QUERIES)
      10:52:23         (metric_deltas, _) = self.wait_for_metric_changes(['admitted'], curr_metrics,
      10:52:23             expected_admitted)
      10:52:23         self.wait_for_admitted_threads(metric_deltas['admitted'])
      10:52:23         # Wait a few heartbeats to ensure the admission controllers have reached a steady
      10:52:23         # state or we may find an impalad dequeue more requests after we capture metrics.
      10:52:23         self.wait_for_heartbeats(4)
      10:52:23     
      10:52:23       final_metrics = self.get_admission_metrics();
      10:52:23       log_metrics("Final metrics: ", final_metrics, logging.INFO);
      10:52:23       metric_deltas = compute_metric_deltas(final_metrics, initial_metrics,
      10:52:23           final_metrics.keys())
      10:52:23 >     assert metric_deltas['timed-out'] == 0
      10:52:23 E     assert 4 == 0
      10:52:23 
      10:52:23 custom_cluster/test_admission_controller.py:639: AssertionError
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                mjacobs Matthew Jacobs
                Reporter:
                tarmstrong Tim Armstrong
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: