Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6516

Failed to advance source: org.apache.beam.sdk.io.rabbitmq.RabbitMqIO

Details

    • Bug
    • Status: Triage Needed
    • P3
    • Resolution: Fixed
    • None
    • 2.33.0
    • io-java-rabbitmq
    • None

    Description

      I'm using the RabbitMqIO connector to get messages from RabbitMQ to BigQuery in my current pipeline. When I submit my pipeline to Dataflow, the messages are propagated correctly to BigQuery, however I'm getting the following log error entry in Dataflow which is repeating itself:

      
      java.io.IOException: Failed to advance source: org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQSource@1cdd077d
              org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:806)
              org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
              org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
              java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              java.lang.Thread.run(Thread.java:745)
      Caused by: java.io.IOException: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
              org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:474)
              org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
              org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
              org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
              java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              java.lang.Thread.run(Thread.java:745)
      Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
              com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:206)
              com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:237)
              org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:451)
              org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
              org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
              org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
              java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              java.lang.Thread.run(Thread.java:745)
      Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
              com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:515)
              com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
              com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
              com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
              com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:676)
              com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
              com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:603)
              java.lang.Thread.run(Thread.java:745)
      

      It seems to me the issue is ACKing messages:

      reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80
      

      I also see a lot of open channels (~90) in the RabbitMQ management interface (see attached screenshot).

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              edin Edin
              Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h
                  2h