Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6133

NullPointerException in S3 Connector when using rotate.interval.ms

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Invalid
    • 0.11.0.0
    • None
    • connect
    • None

    Description

      I just tried out the new rotate.interval.ms feature in the S3 connector to do time based flushing. I am getting this NPE on every event:

      [2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
      java.lang.NullPointerException
              at io.confluent.connect.s3.TopicPartitionWriter.rotateOnTime(TopicPartitionWriter.java:288)
              at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:234)
              at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:748)
      [2017-10-20 23:21:35,233] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
      [2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
      org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
              at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:748)

      I dug into the S3 connect code a bit and it looks like the rotate.interval.ms feature only works if you are using the TimeBasedPartitioner. It will get the TimestampExtractor class from the TimeBasedPartitioner to determine the timestamp of the event, and will use this for the time based flushing.

      I'm using a custom partitioner, but I'd still really like to use the rotate.interval.ms feature, using wall clock time to determine the flushing behavior.

      I'd be willing to work on fixing this issue, but I want to confirm it is actually bug, and not that it was specifically designed to only work with the TimeBasedPartitioner. Even if it is the latter, it should probably not crash with an NPE.

      Attachments

        Activity

          People

            Unassigned Unassigned
            zzbennett Elizabeth Bennett
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: