Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.10.1, 1.0.1, 1.0.2, 1.1.0
-
None
Description
During two somewhat extended outages of our Kafka cluster, we experienced a problem with our Storm topologies consuming data from that Kafka cluster.
Almost all our topologies just silently stopped processing data from some of the topics/partitions, an the only way to fix this situation was to restart those topologies.
I tracked down one occurrence of the failure to this worker, which was running one the KafkaSpouts:
2017-03-18 04:06:15.389 o.a.s.k.KafkaUtils [ERROR] Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION] 2017-03-18 04:06:15.389 o.a.s.k.KafkaSpout [WARN] Fetch failed org.apache.storm.kafka.FailedFetchException: Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:213) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] 2017-03-18 04:06:15.390 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Refreshing partition manager connections 2017-03-18 04:06:15.395 o.a.s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=tagging_log, partitionMap={0=kafka-03:9092, 1=kafka-12:9092, 2=kafka-08:9092, 3=kafka-05:9092}} 2017-03-18 04:06:15.395 o.a.s.k.KafkaUtils [INFO] Task [1/1] assigned [Partition{host=kafka-03:9092, topic=tagging_log, partition=0}, Partition{host=kafka-12:9092, topic=tagging_log, partit ion=1}, Partition{host=kafka-08:9092, topic=tagging_log, partition=2}, Partition{host=kafka-05:9092, topic=tagging_log, partition=3}] 2017-03-18 04:06:15.395 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] 2017-03-18 04:06:15.396 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=kafka-12:9092, topic=tagging_log, partition=1}] 2017-03-18 04:06:15.398 o.a.s.k.PartitionManager [INFO] Read partition information from: /log_processing/tagging/kafka-tagging-spout/partition_1 --> {"partition":1,"off set":40567174332,"topology":{"name":"tagging-aerospike-1","id":"tagging-aerospike-1-3-1489587827"},"topic":"tagging_log","broker":{"port":9092,"host":"kafka-08"}} 2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) [stormjar.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [stormjar.jar:?] at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [stormjar.jar:?] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) [stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) [stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) [stormjar.jar:?] at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) [stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] 2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died! java.lang.RuntimeException: java.net.SocketTimeoutException at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?] at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?] ... 5 more 2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] java.lang.RuntimeException: java.net.SocketTimeoutException at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?] at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?] at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?] at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?] at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?] ... 5 more 2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion shutting thread down...
There were no more outputs in the log after that until the toplogy was manually killed.
As you can see the java.net.SocketTimeoutException escapes the storm-kafka code (probably a problem in and of itself), but the worker is not killed. The thread that calls the .nextTuple method of the spout is exited on the other hand.
This is the culprit line: https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270
I see that this has been fixed in the Java port of the executor code by explicitly excluding java.net.SocketTimeoutException from the condition.
I will open a pull request with a backport tomorrow.
Attachments
Issue Links
- is related to
-
STORM-2194 ReportErrorAndDie doesn't always die
- Resolved
- links to