Kafka
  1. Kafka
  2. KAFKA-824

java.lang.NullPointerException in commitOffsets

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.7.2
    • Fix Version/s: None
    • Component/s: consumer
    • Labels:
      None

      Description

      Neha Narkhede

      "Yes, I have. Unfortunately, I never quite around to fixing it. My guess is
      that it is caused due to a race condition between the rebalance thread and
      the offset commit thread when a rebalance is triggered or the client is
      being shutdown. Do you mind filing a bug ?"

      2013/03/25 12:08:32.020 WARN [ZookeeperConsumerConnector] [] 0_lu-ml-test10.bj-1364184411339-7c88f710 exception during commitOffsets
      java.lang.NullPointerException
      at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
      at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
      at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
      at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
      at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
      at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:103)
      at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$4.apply(ZookeeperConsumerConnector.scala:251)
      at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$4.apply(ZookeeperConsumerConnector.scala:248)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
      at scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:570)
      at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:248)
      at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:246)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.Pool$$anon$1.foreach(Pool.scala:53)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
      at kafka.utils.Pool.foreach(Pool.scala:24)
      at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:246)
      at kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:232)
      at kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:126)
      at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
      at java.lang.Thread.run(Thread.java:722)

      1. ZkClient.0.4.txt
        90 kB
        Artur Denysenko
      2. ZkClient.0.3.txt
        90 kB
        Artur Denysenko
      3. screenshot-1.jpg
        543 kB
        Artur Denysenko

        Activity

        Yonghui Zhao created issue -
        Hide
        Artur Denysenko added a comment -

        I have it as well. No special test case - but it fails periodically ~1 in 10 cases.
        My configuration:

        • zkclient: 0.4
        • kafka: 2.9.2-0.8.1
        • zookeeper: 3.4.5
        Show
        Artur Denysenko added a comment - I have it as well. No special test case - but it fails periodically ~1 in 10 cases. My configuration: zkclient: 0.4 kafka: 2.9.2-0.8.1 zookeeper: 3.4.5
        Hide
        Guozhang Wang added a comment -

        Currently we are only using zkclient 0.3. Could you check if this problem still persists in that version?

        Guozhang

        Show
        Guozhang Wang added a comment - Currently we are only using zkclient 0.3. Could you check if this problem still persists in that version? Guozhang
        Hide
        Artur Denysenko added a comment -

        It fails for both zkclient 0.3 and 0.4. Please see attached files.

        Show
        Artur Denysenko added a comment - It fails for both zkclient 0.3 and 0.4. Please see attached files.
        Artur Denysenko made changes -
        Field Original Value New Value
        Attachment ZkClient.0.3.txt [ 12641643 ]
        Attachment ZkClient.0.4.txt [ 12641644 ]
        Hide
        Jun Rao added a comment -

        Do you see ZK session expiration around that time?

        Show
        Jun Rao added a comment - Do you see ZK session expiration around that time?
        Hide
        Artur Denysenko added a comment -

        I think it's a case then "_connection" is null for some reason: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L308
        Without going deep into hardcore I would override "ZkClient.create" with my method checking "_connection" for null:

        public class KafkaZkClient extends ZkClient{
            public String create(final String path, Object data, final CreateMode mode) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
                if (path == null) {
                    throw new NullPointerException("path must not be null.");
                }
                final byte[] bytes = data == null ? null : serialize(data);
        
                return retryUntilConnected(new Callable<String>() {
        
                    @Override
                    public String call() throws Exception {
                        if(_connection==null) throw new ConnectionLossException(); // FIX HERE <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
                        return _connection.create(path, bytes, mode);
                    }
                });
            }
        }
        
        Show
        Artur Denysenko added a comment - I think it's a case then "_connection" is null for some reason: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L308 Without going deep into hardcore I would override "ZkClient.create" with my method checking "_connection" for null: public class KafkaZkClient extends ZkClient{ public String create( final String path, Object data, final CreateMode mode) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { if (path == null ) { throw new NullPointerException( "path must not be null ." ); } final byte [] bytes = data == null ? null : serialize(data); return retryUntilConnected( new Callable< String >() { @Override public String call() throws Exception { if (_connection== null ) throw new ConnectionLossException(); // FIX HERE <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< return _connection.create(path, bytes, mode); } }); } }
        Hide
        Artur Denysenko added a comment - - edited

        The root of the problem is here: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L690
        The first "return callable.call();" potentially can be called even before "_connection" is initialized.

        So it's better to fix the root.

        Show
        Artur Denysenko added a comment - - edited The root of the problem is here: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L690 The first "return callable.call();" potentially can be called even before "_connection" is initialized. So it's better to fix the root.
        Hide
        Jun Rao added a comment -

        Hmm, not sure if this can happen. The following is what happens in the constructor. _connection is set there and the underlying _zk in ZkConnection is also set.

        public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer)

        { _connection = zkConnection; _zkSerializer = zkSerializer; connect(connectionTimeout, this); }
        Show
        Jun Rao added a comment - Hmm, not sure if this can happen. The following is what happens in the constructor. _connection is set there and the underlying _zk in ZkConnection is also set. public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer) { _connection = zkConnection; _zkSerializer = zkSerializer; connect(connectionTimeout, this); }
        Hide
        Artur Denysenko added a comment -
        Show
        Artur Denysenko added a comment - Could it be by re-using closed ZkClient? https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L941 It happens in some way.
        Hide
        Jun Rao added a comment -

        ZkClient is only closed when shutdown the consumer connector.

        Show
        Jun Rao added a comment - ZkClient is only closed when shutdown the consumer connector.
        Hide
        Artur Denysenko added a comment - - edited

        Any progress?
        Have a look:

        Show
        Artur Denysenko added a comment - - edited Any progress? Have a look:
        Artur Denysenko made changes -
        Attachment screenshot-1.jpg [ 12643326 ]
        Hide
        Andrew Olson added a comment -

        We are seeing a similar NPE in our application's direct usage of the ZkClient, unrelated to commitOffsets.

        Here are a couple example stack traces.

        Caused by: java.lang.NullPointerException
        	at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
        	at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
        	at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
        	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        	at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
        	at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
        
        Caused by: java.lang.NullPointerException
        	at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
        	at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
        	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        	at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
        	at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
        	at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
        

        ZkClient implements the org.apache.zookeeper.Watcher interface, so the process() callback can be invoked at any time by the background ZK event thread [1]. This callback method is not synchronized against the other ZkClient public methods, however. So if a state change event occurs that requires a reconnection [2], the internal ZkConnection is closed while reconnecting [3] which sets its org.apache.zookeeper.ZooKeeper to null [4] resulting in the NullPointerException if the process is concurrently using the ZkClient to read or write data.

        [1] http://zookeeper.apache.org/doc/r3.3.4/zookeeperProgrammers.html#Java+Binding
        [2] https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L457
        [3] https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L953-954
        [4] https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkConnection.java#L79

        Show
        Andrew Olson added a comment - We are seeing a similar NPE in our application's direct usage of the ZkClient, unrelated to commitOffsets. Here are a couple example stack traces. Caused by: java.lang.NullPointerException at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213) Caused by: java.lang.NullPointerException at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) ZkClient implements the org.apache.zookeeper.Watcher interface, so the process() callback can be invoked at any time by the background ZK event thread [1] . This callback method is not synchronized against the other ZkClient public methods, however. So if a state change event occurs that requires a reconnection [2] , the internal ZkConnection is closed while reconnecting [3] which sets its org.apache.zookeeper.ZooKeeper to null [4] resulting in the NullPointerException if the process is concurrently using the ZkClient to read or write data. [1] http://zookeeper.apache.org/doc/r3.3.4/zookeeperProgrammers.html#Java+Binding [2] https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L457 [3] https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L953-954 [4] https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkConnection.java#L79
        Hide
        Andrew Olson added a comment -

        Here's another example stack trace that we have seen.

        java.lang.NullPointerException
        	at org.I0Itec.zkclient.ZkConnection.exists(ZkConnection.java:95)
        	at org.I0Itec.zkclient.ZkClient$3.call(ZkClient.java:439)
        	at org.I0Itec.zkclient.ZkClient$3.call(ZkClient.java:436)
        	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        	at org.I0Itec.zkclient.ZkClient.exists(ZkClient.java:436)
        	at org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:846)
        	at org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:843)
        	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        	at org.I0Itec.zkclient.ZkClient.watchForChilds(ZkClient.java:843)
        	at org.I0Itec.zkclient.ZkClient.subscribeChildChanges(ZkClient.java:114)
        	at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:713)
        	at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:756)
        	at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:145)
        	at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:96)
        
        Show
        Andrew Olson added a comment - Here's another example stack trace that we have seen. java.lang.NullPointerException at org.I0Itec.zkclient.ZkConnection.exists(ZkConnection.java:95) at org.I0Itec.zkclient.ZkClient$3.call(ZkClient.java:439) at org.I0Itec.zkclient.ZkClient$3.call(ZkClient.java:436) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.exists(ZkClient.java:436) at org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:846) at org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:843) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.watchForChilds(ZkClient.java:843) at org.I0Itec.zkclient.ZkClient.subscribeChildChanges(ZkClient.java:114) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:713) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:756) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:145) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:96)
        Hide
        Andrew Olson added a comment -

        I have reported this issue to the zkclient project: https://github.com/sgroschupf/zkclient/issues/25

        Show
        Andrew Olson added a comment - I have reported this issue to the zkclient project: https://github.com/sgroschupf/zkclient/issues/25
        Hide
        John Humphreys added a comment -

        Has any progress been made on this, or do any workarounds exist yet? Has anyone determined what causes it in particular?

        We're getting it fairly regularly and I would love to know how to mitigate the issue.

        Show
        John Humphreys added a comment - Has any progress been made on this, or do any workarounds exist yet? Has anyone determined what causes it in particular? We're getting it fairly regularly and I would love to know how to mitigate the issue.
        Hide
        Johannes Zillmann added a comment -

        Hi guys,

        just had a look at this.
        Think there are only 2 possibilities that such an exception can occur:

        • 1) a null zkConnection is passed in
        • 2) a retryUntilConnected action wakes up and the client was closed in meantime

        I could reproduce the NPE for case 2 and changed the code to throw an clear exception instead of risking unclear follow up exception like the NPE's.
        See https://github.com/sgroschupf/zkclient/commit/0630c9c6e67ab49a51e80bfd939e4a0d01a69dfe

        HTH

        PS: this is part of the zkclient-0.5 release which should be online in a few hours!

        Show
        Johannes Zillmann added a comment - Hi guys, just had a look at this. Think there are only 2 possibilities that such an exception can occur: 1) a null zkConnection is passed in 2) a retryUntilConnected action wakes up and the client was closed in meantime I could reproduce the NPE for case 2 and changed the code to throw an clear exception instead of risking unclear follow up exception like the NPE's. See https://github.com/sgroschupf/zkclient/commit/0630c9c6e67ab49a51e80bfd939e4a0d01a69dfe HTH PS: this is part of the zkclient-0.5 release which should be online in a few hours!
        Hide
        John Humphreys added a comment -

        Awesome, thank you for the quick follow-up .

        Show
        John Humphreys added a comment - Awesome, thank you for the quick follow-up .

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Yonghui Zhao
          • Votes:
            5 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

            • Created:
              Updated:

              Development