Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-4024

BundleBasedDirectRunner fails with recent side input changes

Details

    • Improvement
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • sdk-py-core
    • None

    Description

      Recently, changes were made to support streaming side inputs in the Beam Python DirectRunner (https://github.com/apache/incubator-beam/pull/4838 and https://github.com/apache/beam/pull/4949).  However, these changes cause problems when the BundleBasedDirectRunner is explicitly used.  When Beam Python tests are run with the BundleBasedDirectRunner (i.e. when the FnApiRunner is disabled), the following errors occur:

      ======================================================================
      FAIL: test_empty_write (apache_beam.io.filebasedsink_test.TestFileBasedSink)
      ----------------------------------------------------------------------
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/io/filebasedsink_test.py", line 159, in test_empty_write
      p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/pipeline.py", line 409, in _exit_
      self.run().wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/testing/test_pipeline.py", line 104, in run
      state = result.wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", line 421, in wait_until_finish
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 360, in await_completion
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 406, in await_completion
      six.reraise(t, v, tb)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError:
      -------------------- >> begin captured logging << --------------------
      root: INFO: Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
      root: INFO: Running pipeline with DirectRunner.
      root: DEBUG: Creating 1 empty shard(s).
      root: INFO: Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
      root: INFO: Renamed 1 shards in 0.11 seconds.
      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fd4bda950>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fd4bda950>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fd4bda950>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fd4bda950>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Giving up after 4 attempts.
      root: WARNING: A task failed with exception:
      --------------------- >> end captured logging << ---------------------

      ======================================================================
      FAIL: test_static_value_provider_empty_write (apache_beam.io.filebasedsink_test.TestFileBasedSink)
      ----------------------------------------------------------------------
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/io/filebasedsink_test.py", line 172, in test_static_value_provider_empty_write
      p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/pipeline.py", line 409, in _exit_
      self.run().wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/testing/test_pipeline.py", line 104, in run
      state = result.wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", line 421, in wait_until_finish
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 360, in await_completion
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 406, in await_completion
      six.reraise(t, v, tb)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError:
      -------------------- >> begin captured logging << --------------------
      root: INFO: Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
      root: INFO: Running pipeline with DirectRunner.
      root: DEBUG: Creating 1 empty shard(s).
      root: INFO: Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
      root: INFO: Renamed 1 shards in 0.12 seconds.
      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fcd7a8488>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fcd7a8488>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fcd7a8488>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fcd7a8488>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Giving up after 4 attempts.
      root: WARNING: A task failed with exception:
      --------------------- >> end captured logging << ---------------------

      ======================================================================
      FAIL: test_combine_globally_with_default (apache_beam.transforms.combiners_test.CombineTest)
      ----------------------------------------------------------------------
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/transforms/combiners_test.py", line 294, in test_combine_globally_with_default
      assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/pipeline.py", line 409, in _exit_
      self.run().wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/testing/test_pipeline.py", line 104, in run
      state = result.wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", line 421, in wait_until_finish
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 360, in await_completion
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 406, in await_completion
      six.reraise(t, v, tb)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError:
      -------------------- >> begin captured logging << --------------------
      root: INFO: Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
      root: INFO: Running pipeline with DirectRunner.
      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fceb35dd0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fceb35dd0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fceb35dd0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fceb35dd0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Giving up after 4 attempts.
      root: WARNING: A task failed with exception:
      --------------------- >> end captured logging << ---------------------

      ======================================================================
      FAIL: test_combine_globally_with_default_side_input (apache_beam.transforms.combiners_test.CombineTest)
      ----------------------------------------------------------------------
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/transforms/combiners_test.py", line 312, in test_combine_globally_with_default_side_input
      assert_that(result2, equal_to([10]), label='r2')
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/pipeline.py", line 409, in _exit_
      self.run().wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/testing/test_pipeline.py", line 104, in run
      state = result.wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", line 421, in wait_until_finish
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 360, in await_completion
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 406, in await_completion
      six.reraise(t, v, tb)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError:
      -------------------- >> begin captured logging << --------------------
      root: INFO: Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
      root: INFO: Running pipeline with DirectRunner.
      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fd47254d0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fd47254d0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fd47254d0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fd47254d0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Giving up after 4 attempts.
      root: WARNING: A task failed with exception:
      --------------------- >> end captured logging << ---------------------

      ======================================================================
      FAIL: test_write_with_empty_pcollection (apache_beam.transforms.write_ptransform_test.WriteTest)
      ----------------------------------------------------------------------
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/transforms/write_ptransform_test.py", line 118, in test_write_with_empty_pcollection
      self._run_write_test(data)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/transforms/write_ptransform_test.py", line 108, in _run_write_test
      assert_that(result, is_empty())
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/pipeline.py", line 409, in _exit_
      self.run().wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/testing/test_pipeline.py", line 104, in run
      state = result.wait_until_finish()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", line 421, in wait_until_finish
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 360, in await_completion
      self._executor.await_completion()
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 406, in await_completion
      six.reraise(t, v, tb)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError:
      -------------------- >> begin captured logging << --------------------
      root: INFO: Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
      root: INFO: Running pipeline with DirectRunner.
      root: DEBUG: Creating 1 empty shard(s).
      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fcda1aef0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fcda1aef0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fcda1aef0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f6fcda1aef0>, due to an exception.
      Traceback (most recent call last):
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 307, in call
      side_input_values)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 346, in attempt_call
      self._completion_callback.handle_result(self, self._input_bundle, result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/executor.py", line 238, in handle_result
      input_committed_bundle, self._timer_firings, transform_result)
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 242, in handle_result
      committed_bundle.get_elements_iterable(make_copy=True))
      File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/runners/direct/evaluation_context.py", line 97, in add_values
      assert not view.has_result
      AssertionError

      root: ERROR: Giving up after 4 attempts.
      root: WARNING: A task failed with exception:
      --------------------- >> end captured logging << ---------------------

      ----------------------------------------------------------------------
      Ran 1589 tests in 243.724s

      FAILED (failures=5, skipped=45)
      Test failed: <unittest.runner.TextTestResult run=1589 errors=0 failures=5>

      The following error also occurs:

      Exception in thread Thread: 0, ExecutorServiceWorker-0 (executing):
      Traceback (most recent call last):
      File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
      self.run()
      File "apache_beam/runners/direct/executor.py", line 92, in run
      task.call()
      File "apache_beam/runners/direct/executor.py", line 302, in call
      main_onto_side_window = window_mapping_fn(self._latest_main_input_window)
      AttributeError: 'TransformExecutor' object has no attribute '_latest_main_input_window'

      We should fix these issues.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ccy Charles Chen
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 40m
                  1h 40m