Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.1.0
-
None
-
None
-
OS : Rhel 7
server : VM
Description
We are getting this issue in production and Sparks consumer dying because of Off Set issue.
We observed the following error in Kafka Broker ( that has problems)
------------------------------------------------------------------
[2019-03-18 14:40:14,100] WARN Unable to reconnect to ZooKeeper service, session 0x1692e9ff4410004 has expired (org.apache.zookeeper.ClientCnxn)
[2019-03-18 14:40:14,100] INFO Unable to reconnect to ZooKeeper service, session 0x1692e9ff4410004 has expired, closing socket connection (org.apache.zook
eeper.ClientCnxn)
-----------------------------------------------------------------------------------
Error from other broker when talking to the problematic broker.
[2019-03-18 14:40:14,107] INFO [ReplicaFetcher replicaId=3, leaderId=5, fetcherId=0] Error sending fetch request (sessionId=2127346653, epoch=27048427) to
node 5: java.nio.channels.ClosedSelectorException. (org.apache.kafka.clients.FetchSessionHandler)
----------------------------------------------------------------------------------------------------------------
All topics were having replication factor of 3 and this issue happens when one of the broker was having issues. We are using SCRAM authentication (SHA-256) and SSL.
Sparks Job died with the following error:
ERROR 2019-03-18 07:40:57,178 7924 org.apache.spark.executor.Executor [Executor task launch worker for task 16] Exception in task 27.0 in stage 0.0 (TID 16)
java.lang.AssertionError: assertion failed: Beginning offset 115204574 is after the ending offset 115204516 for topic <topic_name> partition 37. You either provided an invalid fromOffset, or the Kafka topic has been damaged
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:175)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
-----------------------------------------------------------
please let me know if you need more details.