Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-8226

Python Streaming Pipeline getting stuck in dataflow

Details

    Description

      Python streaming pipeline are getting stuck with following error when runing on dataflow

       

      Relevant thread stack
      — Threads (4): [Thread[Thread-19,1,main], Thread[Thread-20,1,main], Thread[Thread-21,1,main], Thread[Thread-22,1,main]] State: WAITING stack: —
        sun.misc.Unsafe.park(Native Method)
        java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:175)
        org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
        org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

      For Python
      — Thread #139819623634688 name: ThreadPoolExecutor-1_0 —
        File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
          self._bootstrap_inner()
        File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
          self.run()
        File "/usr/local/lib/python3.6/threading.py", line 864, in run
          self._target(*self._args, **self._kwargs)
        File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 69, in _worker
          work_item.run()
        File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in run
          result = self.fn(*self.args, **self.kwargs)
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in task
          self._execute(lambda: worker.do_instruction(work), work)
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 158, in _execute
          response = task()
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in <lambda>
          self._execute(lambda: worker.do_instruction(work), work)
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
          request.instruction_id)
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 661, in process_bundle
          instruction_id, expected_transforms):
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", line 213, in input_elements
          data = received.get(timeout=1)
        File "/usr/local/lib/python3.6/queue.py", line 173, in get
          self.not_empty.wait(remaining)
        File "/usr/local/lib/python3.6/threading.py", line 299, in wait
          gotit = waiter.acquire(True, timeout)

      Attachments

        Activity

          People

            yichi Yichi Zhang
            angoenka Ankur Goenka
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1h 50m
                1h 50m