Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6595

Kafka connect commit offset incorrectly.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.10.2.0
    • None
    • connect
    • None

    Description

      Version: ConfluentPlatform Kafka 3.2.0

      SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete records to be sent. While the task is stopped, commitOffset() is called again by the final block in WorkerSourceTask.execute(), it will throw Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this exception. This will trigger closing Producer without waiting the flush timeout.

      After 30 seconds, all incomplete records has been forcefully aborted. If the offset.flush.timeout.ms is configured larger than 30 seconds, WorkerSourceTask will consider those aborted records as sent within flush timeout, which results in incorrectly flushing the source offset.

       

      // code placeholder
      
      2018-02-27 02:59:33,134 INFO  [] Stopping connector dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:254]
      
      2018-02-27 02:59:33,134 INFO  [] Stopped connector dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]
      
      
      
      2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this   [pool-1-thread-13][OffsetStorageWriter.java:110]
      
      2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 threw an uncaught and unrecoverable exception   [pool-1-thread-13][WorkerTask.java:141]
      
      org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
      
              at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
      
              at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)
      
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)
      
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
      
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
      
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      
              at java.lang.Thread.run(Thread.java:745)
      
      2018-02-27 03:00:00,734 ERROR [] Graceful stop of task dp-sqlserver-connector-dptask_455-0 failed.   [pool-3-thread-1][Worker.java:405]
      
      2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since pending requests could not be completed within timeout 30 ms.   [pool-1-thread-13][KafkaProducer.java:713]
      
      2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed to send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}   [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]
      
      java.lang.IllegalStateException: Producer is closed forcefully.
      
              at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)
      
              at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)
      
              at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
      
              at java.lang.Thread.run(Thread.java:745)
      
      2018-02-27 03:00:09,920 INFO  [] Finished WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
      

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            HanlinLiu Hanlin Liu
            Votes:
            3 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: