Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3101

Flink Kafka consumer crashes with NPE when it sees deleted record

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.1, 1.0.0
    • 1.0.0
    • Connectors / Kafka
    • None
    • Apache Flink 0.10.1 binary for Hadoop 2.6.0 with Scala 2.10.

    Description

      Kafka allows a records to be deleted from the log by sending a record with key and null payload. Consumers still can see those null values (deletes) before they are compacted (delete retention point).

      Flink Kafka consumer crashes with NPE when it sees such record:

      Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.Exception
      	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.NullPointerException
      	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:443)
      

      Attachments

        Activity

          People

            rmetzger Robert Metzger
            sanjar Sanjar Akhmedov
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: