Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-11189

Concurrent insert ACID tests are broken in local catalog mode

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Catalog
    • None
    • ghx-label-5

    Description

      Stress test test_concurrent_inserts (in tests/stress/test_acid_stress.py) can fail repeatedly in local catalog mode. In this case, the concurrent checker query (select * from <table>) returns duplicated rows such as reported below, where row [0,2] is duplicated.

      The failure can be reproduced quite easily by running the test (i.e., TestConcurrentAcidInserts) first, via commenting out all the tests prior to it in the test file tests/stress/test_acid_stress.py.

      Setup:

      1. Build the impala and clear HMS in case in a bad state: $IMPALA_HOME/buildall.sh -format_metastore -notests
      2. Start the cluster in local catalog mode: $IMPALA_HOME/bin/start-impala-cluster.py --impalad_args --use_local_catalog=true --catalogd_args --catalog_topic_mode=minimal --catalogd_args --hms_event_polling_interval_s=1
      3. Run the modified stress test: $IMPALA_HOME/bin/impala-py.test $IMPALA_TESTS/stress/test_acid_stress.py

      Error reported:

      09:11:00 qchen@qifan-10229: Impala.03112022] test_acid_stress
      rootLoggerLevel = INFO
      ================================================== test session starts ===================================================
      platform linux2 -- Python 2.7.16, pytest-2.9.2, py-1.4.32, pluggy-0.3.1 -- /home/qchen/Impala.03112022/infra/python/env-gcc7.5.0/bin/python
      cachedir: tests/.cache
      rootdir: /home/qchen/Impala.03112022/tests, inifile: pytest.ini
      plugins: xdist-1.17.1, timeout-1.2.1, random-0.2, forked-0.2
      timeout: 7200s method: signal
      collected 2 items 
      
      tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::test_concurrent_inserts[unique_database0] FAILED
      tests/stress/test_acid_stress.py::TestFailingAcidInserts::test_failing_inserts[unique_database0] PASSED
      ================================================ short test summary info =================================================
      FAIL tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]
      
      ======================================================== FAILURES ========================================================
      __________________________ TestConcurrentAcidInserts.test_concurrent_inserts[unique_database0] ___________________________
      tests/stress/test_acid_stress.py:307: in test_concurrent_inserts
          run_tasks(writers + checkers)
      tests/stress/stress_util.py:45: in run_tasks
          pool.map_async(Task.run, tasks).get(timeout_seconds)
      ../Impala.03082022/toolchain/toolchain-packages-gcc7.5.0/python-2.7.16/lib/python2.7/multiprocessing/pool.py:572: in get
          raise self._value
      E   AssertionError: wid: 2
      E   assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
      E     At index 3 diff: 2 != 3
      E     Left contains more items, first extra item: 4
      E     Full diff:
      E     - [0, 1, 2, 2, 3, 4]
      E     ?           ---
      E     + [0, 1, 2, 3, 4]
      ------------------------------------------------- Captured stderr setup --------------------------------------------------
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      -- connecting to: localhost:21000
      -- connecting to localhost:21050 with impyla
      -- 2022-03-16 09:20:54,762 INFO     MainThread: Closing active operation
      -- connecting to localhost:28000 with impyla
      -- 2022-03-16 09:20:54,774 INFO     MainThread: Closing active operation
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      SET sync_ddl=True;
      -- executing against localhost:21000
      
      DROP DATABASE IF EXISTS `test_concurrent_inserts_8933345c` CASCADE;
      
      -- 2022-03-16 09:20:54,808 INFO     MainThread: Started query 28457f4c7e77cdec:c6d3731900000000
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      SET sync_ddl=True;
      -- executing against localhost:21000
      
      CREATE DATABASE `test_concurrent_inserts_8933345c`;
      
      -- 2022-03-16 09:20:54,877 INFO     MainThread: Started query 374bf99aea680523:48d2405400000000
      -- 2022-03-16 09:21:01,164 INFO     MainThread: Created database "test_concurrent_inserts_8933345c" for test ID "stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]"
      -------------------------------------------------- Captured stderr call --------------------------------------------------
      SET SYNC_DDL=true;
      -- executing against localhost:21000
      
      drop table if exists test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      -- 2022-03-16 09:21:01,173 INFO     MainThread: Started query 20480c2a1d336d35:c2d84edd00000000
      -- executing against localhost:21000
      
      create table test_concurrent_inserts_8933345c.test_concurrent_inserts (wid int, i int) TBLPROPERTIES (
              'transactional_properties' = 'insert_only', 'transactional' = 'true')
              ;
      
      -- 2022-03-16 09:21:01,294 INFO     MainThread: Started query 754969473483b4e9:acfc852300000000
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      -- connecting to: localhost:21000
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      -- connecting to: localhost:21001
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      -- executing against localhost:21000
      
      -- connecting to: localhost:21000
      -- connecting to: localhost:21002
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      -- connecting to: localhost:21001
      SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
      -- connecting to: localhost:21002
      -- connecting to: localhost:21000
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (0, 0);
      
      -- executing against localhost:21001
      
      -- connecting to: localhost:21002
      -- executing against localhost:21002
      
      -- executing against localhost:21000
      
      -- connecting to: localhost:21001
      -- executing against localhost:21001
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (1, 0);
      
      -- executing against localhost:21002
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 0);
      
      -- executing against localhost:21000
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (3, 0);
      
      -- executing against localhost:21002
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (4, 0);
      
      -- executing against localhost:21001
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (5, 0);
      
      select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      -- 2022-03-16 09:21:07,824 INFO     Thread-3: Started query 6f4e9d38d1723252:a6a7368f00000000
      -- 2022-03-16 09:21:07,859 INFO     Thread-4: Started query 33423e0145b7d3e0:ea8d626200000000
      -- 2022-03-16 09:21:07,861 INFO     Thread-8: Started query 4046d8edde82931b:aa11c84800000000
      -- 2022-03-16 09:21:07,875 INFO     Thread-9: Started query 594d63b7814c31ab:8f2b92ca00000000
      -- executing against localhost:21002
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 1);
      
      -- 2022-03-16 09:21:08,229 INFO     Thread-4: Started query a94dab601c125bb4:8988934300000000
      -- executing against localhost:21000
      
      select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      -- executing against localhost:21002
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 2);
      
      -- 2022-03-16 09:21:08,384 INFO     Thread-3: Started query 4040597eba7beb36:4cfa08c800000000
      -- 2022-03-16 09:21:08,409 INFO     Thread-4: Started query 854e1aee63575861:dbf8bafb00000000
      -- executing against localhost:21002
      
      select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      -- executing against localhost:21001
      
      select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      -- 2022-03-16 09:21:08,435 INFO     Thread-9: Started query 7e4d4cf54a44cf03:868efcb100000000
      -- 2022-03-16 09:21:08,456 INFO     Thread-8: Started query 4645788cf2a4d401:0aea969e00000000
      -- executing against localhost:21002
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 3);
      
      -- 2022-03-16 09:21:08,586 INFO     Thread-4: Started query 4e4cdd976dfa3358:6c455b7300000000
      -- executing against localhost:21002
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 4);
      
      -- 2022-03-16 09:21:08,711 INFO     Thread-4: Started query 6f46942eb8807e5f:ffbb146000000000
      -- executing against localhost:21002
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 5);
      
      -- executing against localhost:21000
      
      select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      -- 2022-03-16 09:21:08,959 INFO     Thread-3: Started query d940fa405a776bef:3705218900000000
      -- 2022-03-16 09:21:08,977 INFO     Thread-4: Started query f947061a7c45901c:12b2ba9d00000000
      -- executing against localhost:21001
      
      select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      -- 2022-03-16 09:21:08,997 INFO     Thread-9: Started query f54a0aa01bbb2f86:3b89ae9600000000
      -- executing against localhost:21002
      
      select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;
      
      -- 2022-03-16 09:21:09,147 INFO     Thread-8: Started query fa418523bc0552ce:f0e49b1a00000000
      -- executing against localhost:21002
      
      insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 6);
      
      -- 2022-03-16 09:21:09,250 INFO     Thread-4: Started query fc4751d49d1a1aa2:4bcb48d600000000
      -- closing connection to: localhost:21002
      Traceback (most recent call last):
        File "/home/qchen/Impala.03112022/tests/stress/stress_util.py", line 35, in run
          return self.func(*self.args, **self.kwargs)
        File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line 276, in _impala_role_concurrent_checker
          verify_result_set(result)
        File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line 269, in verify_result_set
          assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d" % wid
      AssertionError: wid: 2
      assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
        At index 3 diff: 2 != 3
        Left contains more items, first extra item: 4
        Full diff:
        - [0, 1, 2, 2, 3, 4]
        ?           ---
        + [0, 1, 2, 3, 4]
      ========================================== 1 failed, 1 passed in 158.88 seconds ==========================================
      [09:23:33 qchen@qifan-10229: Impala.03112022] git branch
        IMPALA-10992-auto-scaling-planner-support
      * master
      
      [09:23:41 qchen@qifan-10229: Impala.03112022] git diff
      diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
      index a921bb961..6df592f3f 100644
      --- a/fe/src/main/java/org/apache/impala/service/Frontend.java
      +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
      @@ -1644,6 +1644,7 @@ public class Frontend {
               markTimelineRetries(attempt, retryMsg, timeline);
               return req;
             } catch (InconsistentMetadataFetchException e) {
      +        LOG.error("BAR: catch an InconsistentMetadataFetchException");
               if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
                 markTimelineRetries(attempt, e.getMessage(), timeline);
                 throw e;
      @@ -1819,14 +1820,18 @@ public class Frontend {
           } catch (Exception e) {
             if (queryCtx.isSetTransaction_id()) {
               try {
      +          LOG.error("BAR: to abort Transaction");
                 abortTransaction(queryCtx.getTransaction_id());
      +          LOG.error("BAR: Transaction aborted");
                 timeline.markEvent("Transaction aborted");
               } catch (TransactionException te) {
                 LOG.error("Could not abort transaction because: " + te.getMessage());
               }
             } else if (queryCtx.isIs_kudu_transactional()) {
               try {
      +          LOG.error("BAR: to abort kudu Transaction");
                 abortKuduTransaction(queryCtx.getQuery_id());
      +          LOG.error("BAR: kudu Transaction aborted");
                 timeline.markEvent(
                     "Kudu transaction aborted: " + queryCtx.getQuery_id().toString());
               } catch (TransactionException te) {
      diff --git a/tests/stress/test_acid_stress.py b/tests/stress/test_acid_stress.py
      index f6439ff3c..09ec874e7 100644
      --- a/tests/stress/test_acid_stress.py
      +++ b/tests/stress/test_acid_stress.py
      @@ -46,188 +46,188 @@ class TestAcidStress(ImpalaTestSuite):
                          v.get_value('table_format').compression_codec == 'none'))
       
       
      -class TestAcidInsertsBasic(TestAcidStress):
      -  @classmethod
      -  def get_workload(self):
      -    return super(TestAcidInsertsBasic, self).get_workload()
      -
      -  @classmethod
      -  def add_test_dimensions(cls):
      -    super(TestAcidInsertsBasic, cls).add_test_dimensions()
      -
      -  def _verify_result(self, result, expected_result):
      -    """Verify invariants for 'run' and 'i'."""
      -    assert len(result.data) > 0
      -    run_max = -1
      -    i_list = []
      -    for line in result.data:
      -      [run, i] = map(int, (line.split('\t')))
      -      run_max = max(run_max, run)
      -      i_list.append(i)
      -    assert expected_result["run"] <= run_max  # shouldn't see data overwritten in the past
      -    i_list.sort()
      -    if expected_result["run"] < run_max:
      -      expected_result["run"] = run_max
      -      expected_result["i"] = 0
      -      return
      -    assert i_list[-1] >= expected_result["i"]
      -    assert i_list == range(i_list[-1] + 1)  # 'i' should have all values from 0 to max_i
      -    expected_result["i"] = i_list[-1]
      -
      -  def _hive_role_write_inserts(self, tbl_name, partitioned):
      -    """INSERT INTO/OVERWRITE a table several times from Hive."""
      -    part_expr = "partition (p=1)" if partitioned else ""
      -    for run in xrange(0, NUM_OVERWRITES):
      -      OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
      -          """ % (tbl_name, part_expr, run, 0)
      -      self.run_stmt_in_hive(OVERWRITE_SQL)
      -      for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
      -        INSERT_SQL = """insert into table %s %s values (%i, %i)
      -            """ % (tbl_name, part_expr, run, i)
      -        self.run_stmt_in_hive(INSERT_SQL)
      -
      -  def _impala_role_write_inserts(self, tbl_name, partitioned):
      -    """INSERT INTO/OVERWRITE a table several times from Impala."""
      -    try:
      -      impalad_client = ImpalaTestSuite.create_impala_client()
      -      part_expr = "partition (p=1)" if partitioned else ""
      -      for run in xrange(0, NUM_OVERWRITES + 1):
      -        OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
      -            """ % (tbl_name, part_expr, run, 0)
      -        impalad_client.execute(OVERWRITE_SQL)
      -        for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
      -          INSERT_SQL = """insert into table %s %s values (%i, %i)
      -              """ % (tbl_name, part_expr, run, i)
      -          impalad_client.execute(INSERT_SQL)
      -    finally:
      -      impalad_client.close()
      -
      -  def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
      -    """SELECT from a table many times until the expected final values are found."""
      -    try:
      -      impalad_client = ImpalaTestSuite.create_impala_client()
      -      expected_result = {"run": -1, "i": 0}
      -      accept_empty_table = True
      -      while expected_result["run"] != NUM_OVERWRITES and \
      -          expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
      -        time.sleep(sleep_seconds)
      -        if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
      -        result = impalad_client.execute("select run, i from %s" % tbl_name)
      -        if len(result.data) == 0:
      -          assert accept_empty_table
      -          continue
      -        accept_empty_table = False
      -        self._verify_result(result, expected_result)
      -    finally:
      -      impalad_client.close()
      -
      -  def _create_table(self, full_tbl_name, partitioned):
      -    """Creates test table with name 'full_tbl_name'. Table is partitioned if
      -    'partitioned' is set to True."""
      -    part_expr = "partitioned by (p int)" if partitioned else ""
      -
      -    CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
      -         'transactional_properties' = 'insert_only', 'transactional' = 'true')
      -         """ % (full_tbl_name, part_expr)
      -    self.client.execute("drop table if exists %s" % full_tbl_name)
      -    self.client.execute(CREATE_SQL)
      -
      -  def _run_test_read_hive_inserts(self, unique_database, partitioned):
      -    """Check that Impala can read a single insert only ACID table (over)written by Hive
      -    several times. Consistency can be checked by using incremental values for
      -    overwrites ('run') and inserts ('i').
      -    """
      -    tbl_name = "%s.test_read_hive_inserts" % unique_database
      -    self._create_table(tbl_name, partitioned)
      -
      -    run_tasks([
      -        Task(self._hive_role_write_inserts, tbl_name, partitioned),
      -        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
      -           sleep_seconds=3)])
      -
      -  def _run_test_read_impala_inserts(self, unique_database, partitioned):
      -    """Check that Impala can read a single insert only ACID table (over)written by Hive
      -    several times. Consistency can be checked by using incremental values for
      -    overwrites ('run') and inserts ('i').
      -    """
      -    tbl_name = "%s.test_read_impala_inserts" % unique_database
      -    self._create_table(tbl_name, partitioned)
      -
      -    run_tasks([
      -        Task(self._impala_role_write_inserts, tbl_name, partitioned),
      -        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
      -           sleep_seconds=0.1)])
      -
      -  @SkipIfHive2.acid
      -  @SkipIfS3.hive
      -  @SkipIfGCS.hive
      -  @SkipIfCOS.hive
      -  @pytest.mark.execute_serially
      -  @pytest.mark.stress
      -  def test_read_hive_inserts(self, unique_database):
      -    """Check that Impala can read partitioned and non-partitioned ACID tables
      -    written by Hive."""
      -    for is_partitioned in [False, True]:
      -      self._run_test_read_hive_inserts(unique_database, is_partitioned)
      -
      -  @SkipIfHive2.acid
      -  @pytest.mark.execute_serially
      -  @pytest.mark.stress
      -  def test_read_impala_inserts(self, unique_database):
      -    """Check that Impala can read partitioned and non-partitioned ACID tables
      -    written by Hive."""
      -    for is_partitioned in [False, True]:
      -      self._run_test_read_impala_inserts(unique_database, is_partitioned)
      -
      -  def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, sleep_sec):
      -    insert_op = "OVERWRITE" if is_overwrite else "INTO"
      -    try:
      -      impalad_client = ImpalaTestSuite.create_impala_client()
      -      impalad_client.execute(
      -          """insert {op} table {tbl_name} partition({partition})
      -             select sleep({sleep_ms})""".format(op=insert_op, tbl_name=tbl_name,
      -          partition=partition, sleep_ms=sleep_sec * 1000))
      -    finally:
      -      impalad_client.close()
      -
      -  @pytest.mark.execute_serially
      -  @pytest.mark.stress
      -  @SkipIf.not_hdfs
      -  @UniqueDatabase.parametrize(sync_ddl=True)
      -  def test_partitioned_inserts(self, unique_database):
      -    """Check that the different ACID write operations take appropriate locks.
      -       INSERT INTO: should take a shared lock
      -       INSERT OVERWRITE: should take an exclusive lock
      -       Both should take PARTITION-level lock in case of static partition insert."""
      -    tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
      -    self.client.set_configuration_option("SYNC_DDL", "true")
      -    self.client.execute("""
      -        CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
      -        TBLPROPERTIES(
      -        'transactional_properties'='insert_only','transactional'='true')""".format(
      -        tbl_name))
      -    # Warmup INSERT
      -    self.execute_query("alter table {0} add partition(p=0,q=0)".format(tbl_name))
      -    sleep_sec = 5
      -    task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
      -        "p=1,q=1", False, sleep_sec)
      -    # INSERT INTO the same partition can run in parallel.
      -    duration = run_tasks([task_insert_into, task_insert_into])
      -    assert duration < 3 * sleep_sec
      -    task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name,
      -      "p=1,q=1", True, sleep_sec)
      -    # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
      -    duration = run_tasks([task_insert_into, task_insert_overwrite])
      -    assert duration > 4 * sleep_sec
      -    # INSERT OVERWRITEs to the same partition should have mutual exclusion.
      -    duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
      -    assert duration > 4 * sleep_sec
      -    task_insert_overwrite_2 = Task(self._impala_role_partition_writer, tbl_name,
      -      "p=1,q=2", True, sleep_sec)
      -    # INSERT OVERWRITEs to different partitions can run in parallel.
      -    duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
      -    assert duration < 3 * sleep_sec
      -
      +#class TestAcidInsertsBasic(TestAcidStress):
      +#  @classmethod
      +#  def get_workload(self):
      +#    return super(TestAcidInsertsBasic, self).get_workload()
      +#
      +#  @classmethod
      +#  def add_test_dimensions(cls):
      +#    super(TestAcidInsertsBasic, cls).add_test_dimensions()
      +#
      +#  def _verify_result(self, result, expected_result):
      +#    """Verify invariants for 'run' and 'i'."""
      +#    assert len(result.data) > 0
      +#    run_max = -1
      +#    i_list = []
      +#    for line in result.data:
      +#      [run, i] = map(int, (line.split('\t')))
      +#      run_max = max(run_max, run)
      +#      i_list.append(i)
      +#    assert expected_result["run"] <= run_max  # shouldn't see data overwritten in the past
      +#    i_list.sort()
      +#    if expected_result["run"] < run_max:
      +#      expected_result["run"] = run_max
      +#      expected_result["i"] = 0
      +#      return
      +#    assert i_list[-1] >= expected_result["i"]
      +#    assert i_list == range(i_list[-1] + 1)  # 'i' should have all values from 0 to max_i
      +#    expected_result["i"] = i_list[-1]
      +#
      +#  def _hive_role_write_inserts(self, tbl_name, partitioned):
      +#    """INSERT INTO/OVERWRITE a table several times from Hive."""
      +#    part_expr = "partition (p=1)" if partitioned else ""
      +#    for run in xrange(0, NUM_OVERWRITES):
      +#      OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
      +#          """ % (tbl_name, part_expr, run, 0)
      +#      self.run_stmt_in_hive(OVERWRITE_SQL)
      +#      for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
      +#        INSERT_SQL = """insert into table %s %s values (%i, %i)
      +#            """ % (tbl_name, part_expr, run, i)
      +#        self.run_stmt_in_hive(INSERT_SQL)
      +#
      +#  def _impala_role_write_inserts(self, tbl_name, partitioned):
      +#    """INSERT INTO/OVERWRITE a table several times from Impala."""
      +#    try:
      +#      impalad_client = ImpalaTestSuite.create_impala_client()
      +#      part_expr = "partition (p=1)" if partitioned else ""
      +#      for run in xrange(0, NUM_OVERWRITES + 1):
      +#        OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
      +#            """ % (tbl_name, part_expr, run, 0)
      +#        impalad_client.execute(OVERWRITE_SQL)
      +#        for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
      +#          INSERT_SQL = """insert into table %s %s values (%i, %i)
      +#              """ % (tbl_name, part_expr, run, i)
      +#          impalad_client.execute(INSERT_SQL)
      +#    finally:
      +#      impalad_client.close()
      +#
      +#  def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
      +#    """SELECT from a table many times until the expected final values are found."""
      +#    try:
      +#      impalad_client = ImpalaTestSuite.create_impala_client()
      +#      expected_result = {"run": -1, "i": 0}
      +#      accept_empty_table = True
      +#      while expected_result["run"] != NUM_OVERWRITES and \
      +#          expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
      +#        time.sleep(sleep_seconds)
      +#        if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
      +#        result = impalad_client.execute("select run, i from %s" % tbl_name)
      +#        if len(result.data) == 0:
      +#          assert accept_empty_table
      +#          continue
      +#        accept_empty_table = False
      +#        self._verify_result(result, expected_result)
      +#    finally:
      +#      impalad_client.close()
      +#
      +#  def _create_table(self, full_tbl_name, partitioned):
      +#    """Creates test table with name 'full_tbl_name'. Table is partitioned if
      +#    'partitioned' is set to True."""
      +#    part_expr = "partitioned by (p int)" if partitioned else ""
      +#
      +#    CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
      +#         'transactional_properties' = 'insert_only', 'transactional' = 'true')
      +#         """ % (full_tbl_name, part_expr)
      +#    self.client.execute("drop table if exists %s" % full_tbl_name)
      +#    self.client.execute(CREATE_SQL)
      +#
      +#  def _run_test_read_hive_inserts(self, unique_database, partitioned):
      +#    """Check that Impala can read a single insert only ACID table (over)written by Hive
      +#    several times. Consistency can be checked by using incremental values for
      +#    overwrites ('run') and inserts ('i').
      +#    """
      +#    tbl_name = "%s.test_read_hive_inserts" % unique_database
      +#    self._create_table(tbl_name, partitioned)
      +#
      +#    run_tasks([
      +#        Task(self._hive_role_write_inserts, tbl_name, partitioned),
      +#        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
      +#           sleep_seconds=3)])
      +#
      +#  def _run_test_read_impala_inserts(self, unique_database, partitioned):
      +#    """Check that Impala can read a single insert only ACID table (over)written by Hive
      +#    several times. Consistency can be checked by using incremental values for
      +#    overwrites ('run') and inserts ('i').
      +#    """
      +#    tbl_name = "%s.test_read_impala_inserts" % unique_database
      +#    self._create_table(tbl_name, partitioned)
      +#
      +#    run_tasks([
      +#        Task(self._impala_role_write_inserts, tbl_name, partitioned),
      +#        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
      +#           sleep_seconds=0.1)])
      +#
      +#  @SkipIfHive2.acid
      +#  @SkipIfS3.hive
      +#  @SkipIfGCS.hive
      +#  @SkipIfCOS.hive
      +#  @pytest.mark.execute_serially
      +#  @pytest.mark.stress
      +#  def test_read_hive_inserts(self, unique_database):
      +#    """Check that Impala can read partitioned and non-partitioned ACID tables
      +#    written by Hive."""
      +#    for is_partitioned in [False, True]:
      +#      self._run_test_read_hive_inserts(unique_database, is_partitioned)
      +#
      +#  @SkipIfHive2.acid
      +#  @pytest.mark.execute_serially
      +#  @pytest.mark.stress
      +#  def test_read_impala_inserts(self, unique_database):
      +#    """Check that Impala can read partitioned and non-partitioned ACID tables
      +#    written by Hive."""
      +#    for is_partitioned in [False, True]:
      +#      self._run_test_read_impala_inserts(unique_database, is_partitioned)
      +#
      +#  def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, sleep_sec):
      +#    insert_op = "OVERWRITE" if is_overwrite else "INTO"
      +#    try:
      +#      impalad_client = ImpalaTestSuite.create_impala_client()
      +#      impalad_client.execute(
      +#          """insert {op} table {tbl_name} partition({partition})
      +#             select sleep({sleep_ms})""".format(op=insert_op, tbl_name=tbl_name,
      +#          partition=partition, sleep_ms=sleep_sec * 1000))
      +#    finally:
      +#      impalad_client.close()
      +#
      +#  @pytest.mark.execute_serially
      +#  @pytest.mark.stress
      +#  @SkipIf.not_hdfs
      +#  @UniqueDatabase.parametrize(sync_ddl=True)
      +#  def test_partitioned_inserts(self, unique_database):
      +#    """Check that the different ACID write operations take appropriate locks.
      +#       INSERT INTO: should take a shared lock
      +#       INSERT OVERWRITE: should take an exclusive lock
      +#       Both should take PARTITION-level lock in case of static partition insert."""
      +#    tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
      +#    self.client.set_configuration_option("SYNC_DDL", "true")
      +#    self.client.execute("""
      +#        CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
      +#        TBLPROPERTIES(
      +#        'transactional_properties'='insert_only','transactional'='true')""".format(
      +#        tbl_name))
      +#    # Warmup INSERT
      +#    self.execute_query("alter table {0} add partition(p=0,q=0)".format(tbl_name))
      +#    sleep_sec = 5
      +#    task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
      +#        "p=1,q=1", False, sleep_sec)
      +#    # INSERT INTO the same partition can run in parallel.
      +#    duration = run_tasks([task_insert_into, task_insert_into])
      +#    assert duration < 3 * sleep_sec
      +#    task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name,
      +#      "p=1,q=1", True, sleep_sec)
      +#    # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
      +#    duration = run_tasks([task_insert_into, task_insert_overwrite])
      +#    assert duration > 4 * sleep_sec
      +#    # INSERT OVERWRITEs to the same partition should have mutual exclusion.
      +#    duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
      +#    assert duration > 4 * sleep_sec
      +#    task_insert_overwrite_2 = Task(self._impala_role_partition_writer, tbl_name,
      +#      "p=1,q=2", True, sleep_sec)
      +#    # INSERT OVERWRITEs to different partitions can run in parallel.
      +#    duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
      +#    assert duration < 3 * sleep_sec
      +#
       
       class TestConcurrentAcidInserts(TestAcidStress):
         @classmethod
      (END)
      
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            sql_forever Qifan Chen
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: