    • Spark 2.1.0
      Spark Streaming Kafka 010
      Yarn - Cluster Mode
      CDH 5.8.4
      CentOS Linux release 7.2


      We have recently upgraded our Streaming App with Direct Stream to Spark 2 (spark-streaming-kafka-0-10 - 2.1.0) with Kafka version ( & Consumer 10 . We find abnormal delays after the application has run for a couple of hours or completed consumption of approx. ~ 5 million records.

      See screenshot 1 & 2

      There is a sudden dip in the processing time from ~15 seconds (usual for this app) to ~3 minutes & from then on the processing time keeps degrading throughout.

      We have seen that the delay is due to certain tasks taking the exact time duration of the configured Kafka Consumer 'request.timeout.ms' . We have tested this by varying timeout property to different values.

      See screenshot 3.

      I think the get(offset: Long, timeout: Long): ConsumerRecord[K, V] method & subsequent poll(timeout) method in CachedKafkaConsumer.scala is actually timing out on some of the partitions without reading data. But the executor logs it as successfully completed after the exact timeout duration. Note that most other tasks are completing successfully with millisecond duration. The timeout is most likely from the org.apache.kafka.clients.consumer.KafkaConsumer & we did not observe any network latency difference.

      We have observed this across multiple clusters & multiple apps with & without TLS/SSL. Spark 1.6 with 0-8 consumer seems to be fine with consistent performance

      17/05/17 10:30:06 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 446288
      17/05/17 10:30:06 INFO executor.Executor: Running task 11.0 in stage 5663.0 (TID 446288)
      17/05/17 10:30:06 INFO kafka010.KafkaRDD: Computing topic XX-XXX-XX, partition 0 offsets 776843 -> 779591
      17/05/17 10:30:06 INFO kafka010.CachedKafkaConsumer: Initial fetch for spark-executor-default1 XX-XXX-XX 0 776843
      17/05/17 10:30:56 INFO executor.Executor: Finished task 11.0 in stage 5663.0 (TID 446288). 1699 bytes result sent to driver
      17/05/17 10:30:56 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 446329
      17/05/17 10:30:56 INFO executor.Executor: Running task 0.0 in stage 5667.0 (TID 446329)
      17/05/17 10:30:56 INFO spark.MapOutputTrackerWorker: Updating epoch to 3116 and clearing cache
      17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 6807
      17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807_piece0 stored as bytes in memory (estimated size 13.1 KB, free 4.1 GB)
      17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Reading broadcast variable 6807 took 4 ms
      17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807 stored as values in m

      We can see that the log statement differ with the exact timeout duration.

      Our consumer config is below.

      17/05/17 12:33:13 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1171dde4
      17/05/17 12:33:13 INFO consumer.ConsumerConfig: ConsumerConfig values:
      metric.reporters = []
      metadata.max.age.ms = 300000
      partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
      reconnect.backoff.ms = 50
      sasl.kerberos.ticket.renew.window.factor = 0.8
      max.partition.fetch.bytes = 1048576
      bootstrap.servers = [xxxxx.xxx.xxx:9092]
      ssl.keystore.type = JKS
      enable.auto.commit = true
      sasl.mechanism = GSSAPI
      interceptor.classes = null
      exclude.internal.topics = true
      ssl.truststore.password = null
      client.id =
      ssl.endpoint.identification.algorithm = null
      max.poll.records = 2147483647
      check.crcs = true
      request.timeout.ms = 50000
      heartbeat.interval.ms = 3000
      auto.commit.interval.ms = 5000
      receive.buffer.bytes = 65536
      ssl.truststore.type = JKS
      ssl.truststore.location = null
      ssl.keystore.password = null
      fetch.min.bytes = 1
      send.buffer.bytes = 131072
      value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
      group.id = default1
      retry.backoff.ms = 100
      ssl.secure.random.implementation = null
      sasl.kerberos.kinit.cmd = /usr/bin/kinit
      sasl.kerberos.service.name = null
      sasl.kerberos.ticket.renew.jitter = 0.05
      ssl.trustmanager.algorithm = PKIX
      ssl.key.password = null
      fetch.max.wait.ms = 500
      sasl.kerberos.min.time.before.relogin = 60000
      connections.max.idle.ms = 540000
      session.timeout.ms = 30000
      metrics.num.samples = 2
      key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
      ssl.protocol = TLS
      ssl.provider = null
      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      ssl.keystore.location = null
      ssl.cipher.suites = null
      security.protocol = PLAINTEXT
      ssl.keymanager.algorithm = SunX509
      metrics.sample.window.ms = 30000
      auto.offset.reset = latest


