Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-6100

test_exchange_delays flaky under ASAN

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: Impala 2.11.0
    • Fix Version/s: Impala 2.11.0
    • Component/s: Infrastructure
    • Labels:

      Description

      We may need to adjust the timeouts further under ASAN.

      03:39:15 =================================== FAILURES ===================================
      03:39:15  TestExchangeDelays.test_exchange_large_delay[exec_option: {'batch_size': 0, 'num_nodes': 0, 'disable_codegen_rows_threshold': 0, 'disable_codegen': False, 'abort_on_error': 1, 'exec_single_node_rows_threshold': 0} | table_format: text/none] 
      03:39:15 
      03:39:15 self = <test_exchange_delays.TestExchangeDelays object at 0x36c8790>
      03:39:15 vector = <tests.common.test_vector.ImpalaTestVector object at 0x3ff1f50>
      03:39:15 
      03:39:15     @pytest.mark.execute_serially
      03:39:15     @CustomClusterTestSuite.with_args("--stress_datastream_recvr_delay_ms=10000"
      03:39:15           " --datastream_sender_timeout_ms=1")
      03:39:15     def test_exchange_large_delay(self, vector):
      03:39:15       """Test delays in registering data stream receivers where all of the batches sent
      03:39:15         will time out before the receiver registers. Before IMPALA-2987, this scenario
      03:39:15         resulted in the query hanging.
      03:39:15         """
      03:39:15       self.run_test_case('QueryTest/exchange-delays', vector)
      03:39:15     
      03:39:15       # Test the special case when no batches are sent and the EOS message times out.
      03:39:15 >     self.run_test_case('QueryTest/exchange-delays-zero-rows', vector)
      03:39:15 
      03:39:15 custom_cluster/test_exchange_delays.py:52: 
      03:39:15 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      03:39:15 
      03:39:15 self = <test_exchange_delays.TestExchangeDelays object at 0x36c8790>
      03:39:15 test_file_name = 'QueryTest/exchange-delays-zero-rows'
      03:39:15 vector = <tests.common.test_vector.ImpalaTestVector object at 0x3ff1f50>
      03:39:15 use_db = None, multiple_impalad = False, encoding = None, test_file_vars = None
      03:39:15 
      03:39:15     def run_test_case(self, test_file_name, vector, use_db=None, multiple_impalad=False,
      03:39:15         encoding=None, test_file_vars=None):
      03:39:15       """
      03:39:15         Runs the queries in the specified test based on the vector values
      03:39:15     
      03:39:15         Runs the query using targeting the file format/compression specified in the test
      03:39:15         vector and the exec options specified in the test vector. If multiple_impalad=True
      03:39:15         a connection to a random impalad will be chosen to execute each test section.
      03:39:15         Otherwise, the default impalad client will be used.
      03:39:15         Additionally, the encoding for all test data can be specified using the 'encoding'
      03:39:15         parameter. This is useful when data is ingested in a different encoding (ex.
      03:39:15         latin). If not set, the default system encoding will be used.
      03:39:15         If a dict 'test_file_vars' is provided, then all keys will be replaced with their
      03:39:15         values in queries before they are executed. Callers need to avoid using reserved key
      03:39:15         names, see 'reserved_keywords' below.
      03:39:15         """
      03:39:15       table_format_info = vector.get_value('table_format')
      03:39:15       exec_options = vector.get_value('exec_option')
      03:39:15     
      03:39:15       # Resolve the current user's primary group name.
      03:39:15       group_id = pwd.getpwnam(getuser()).pw_gid
      03:39:15       group_name = grp.getgrgid(group_id).gr_name
      03:39:15     
      03:39:15       target_impalad_clients = list()
      03:39:15       if multiple_impalad:
      03:39:15         target_impalad_clients =\
      03:39:15             map(ImpalaTestSuite.create_impala_client, IMPALAD_HOST_PORT_LIST)
      03:39:15       else:
      03:39:15         target_impalad_clients = [self.client]
      03:39:15     
      03:39:15       # Change the database to reflect the file_format, compression codec etc, or the
      03:39:15       # user specified database for all targeted impalad.
      03:39:15       for impalad_client in target_impalad_clients:
      03:39:15         ImpalaTestSuite.change_database(impalad_client,
      03:39:15             table_format_info, use_db, pytest.config.option.scale_factor)
      03:39:15         impalad_client.set_configuration(exec_options)
      03:39:15     
      03:39:15       sections = self.load_query_test_file(self.get_workload(), test_file_name,
      03:39:15           encoding=encoding)
      03:39:15       for test_section in sections:
      03:39:15         if 'SHELL' in test_section:
      03:39:15           assert len(test_section) == 1, \
      03:39:15             "SHELL test sections can't contain other sections"
      03:39:15           cmd = test_section['SHELL']\
      03:39:15             .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)\
      03:39:15             .replace('$IMPALA_HOME', IMPALA_HOME)
      03:39:15           if use_db: cmd = cmd.replace('$DATABASE', use_db)
      03:39:15           LOG.info("Shell command: " + cmd)
      03:39:15           check_call(cmd, shell=True)
      03:39:15           continue
      03:39:15     
      03:39:15         if 'QUERY' not in test_section:
      03:39:15           assert 0, 'Error in test file %s. Test cases require a -- QUERY section.\n%s' %\
      03:39:15               (test_file_name, pprint.pformat(test_section))
      03:39:15     
      03:39:15         if 'SETUP' in test_section:
      03:39:15           self.execute_test_case_setup(test_section['SETUP'], table_format_info)
      03:39:15     
      03:39:15         # TODO: support running query tests against different scale factors
      03:39:15         query = QueryTestSectionReader.build_query(test_section['QUERY']
      03:39:15             .replace('$GROUP_NAME', group_name)
      03:39:15             .replace('$IMPALA_HOME', IMPALA_HOME)
      03:39:15             .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)
      03:39:15             .replace('$SECONDARY_FILESYSTEM', os.getenv("SECONDARY_FILESYSTEM") or str()))
      03:39:15         if use_db: query = query.replace('$DATABASE', use_db)
      03:39:15     
      03:39:15         reserved_keywords = ["$DATABASE", "$FILESYSTEM_PREFIX", "$GROUP_NAME",
      03:39:15                              "$IMPALA_HOME", "$NAMENODE", "$QUERY", "$SECONDARY_FILESYSTEM"]
      03:39:15     
      03:39:15         if test_file_vars:
      03:39:15           for key, value in test_file_vars.iteritems():
      03:39:15             if key in reserved_keywords:
      03:39:15               raise RuntimeError("Key {0} is reserved".format(key))
      03:39:15             query = query.replace(key, value)
      03:39:15     
      03:39:15         if 'QUERY_NAME' in test_section:
      03:39:15           LOG.info('Query Name: \n%s\n' % test_section['QUERY_NAME'])
      03:39:15     
      03:39:15         # Support running multiple queries within the same test section, only verifying the
      03:39:15         # result of the final query. The main use case is to allow for 'USE database'
      03:39:15         # statements before a query executes, but it is not limited to that.
      03:39:15         # TODO: consider supporting result verification of all queries in the future
      03:39:15         result = None
      03:39:15         target_impalad_client = choice(target_impalad_clients)
      03:39:15         query_options_changed = []
      03:39:15         try:
      03:39:15           user = None
      03:39:15           if 'USER' in test_section:
      03:39:15             # Create a new client so the session will use the new username.
      03:39:15             user = test_section['USER'].strip()
      03:39:15             target_impalad_client = self.create_impala_client()
      03:39:15           for query in query.split(';'):
      03:39:15             set_pattern_match = SET_PATTERN.match(query)
      03:39:15             if set_pattern_match != None:
      03:39:15               query_options_changed.append(set_pattern_match.groups()[0])
      03:39:15             result = self.__execute_query(target_impalad_client, query, user=user)
      03:39:15         except Exception as e:
      03:39:15           if 'CATCH' in test_section:
      03:39:15             self.__verify_exceptions(test_section['CATCH'], str(e), use_db)
      03:39:15             continue
      03:39:15           raise
      03:39:15         finally:
      03:39:15           if len(query_options_changed) > 0:
      03:39:15             self.__restore_query_options(query_options_changed, target_impalad_client)
      03:39:15     
      03:39:15         if 'CATCH' in test_section and '__NO_ERROR__' not in test_section['CATCH']:
      03:39:15           expected_str = " or ".join(test_section['CATCH']).strip() \
      03:39:15             .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) \
      03:39:15             .replace('$NAMENODE', NAMENODE) \
      03:39:15             .replace('$IMPALA_HOME', IMPALA_HOME)
      03:39:15 >         assert False, "Expected exception: %s" % expected_str
      03:39:15 E         AssertionError: Expected exception: Sender timed out waiting for receiver fragment instance
      03:39:15 
      03:39:15 common/impala_test_suite.py:406: AssertionError
      

        Attachments

          Activity

            People

            • Assignee:
              tarmstrong Tim Armstrong
              Reporter:
              tarmstrong Tim Armstrong
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: