Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.1.0
-
None
-
None
-
Spark 2.1.0
Spark Streaming Kafka 010
Yarn - Cluster Mode
CDH 5.8.4
CentOS Linux release 7.2
Description
We have recently upgraded our Streaming App with Direct Stream to Spark 2 (spark-streaming-kafka-0-10 - 2.1.0) with Kafka version (0.10.0.0) & Consumer 10 . We find abnormal delays after the application has run for a couple of hours or completed consumption of approx. ~ 5 million records.
See screenshot 1 & 2
There is a sudden dip in the processing time from ~15 seconds (usual for this app) to ~3 minutes & from then on the processing time keeps degrading throughout.
We have seen that the delay is due to certain tasks taking the exact time duration of the configured Kafka Consumer 'request.timeout.ms' . We have tested this by varying timeout property to different values.
See screenshot 3.
I think the get(offset: Long, timeout: Long): ConsumerRecord[K, V] method & subsequent poll(timeout) method in CachedKafkaConsumer.scala is actually timing out on some of the partitions without reading data. But the executor logs it as successfully completed after the exact timeout duration. Note that most other tasks are completing successfully with millisecond duration. The timeout is most likely from the org.apache.kafka.clients.consumer.KafkaConsumer & we did not observe any network latency difference.
We have observed this across multiple clusters & multiple apps with & without TLS/SSL. Spark 1.6 with 0-8 consumer seems to be fine with consistent performance
17/05/17 10:30:06 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 446288
17/05/17 10:30:06 INFO executor.Executor: Running task 11.0 in stage 5663.0 (TID 446288)
17/05/17 10:30:06 INFO kafka010.KafkaRDD: Computing topic XX-XXX-XX, partition 0 offsets 776843 -> 779591
17/05/17 10:30:06 INFO kafka010.CachedKafkaConsumer: Initial fetch for spark-executor-default1 XX-XXX-XX 0 776843
17/05/17 10:30:56 INFO executor.Executor: Finished task 11.0 in stage 5663.0 (TID 446288). 1699 bytes result sent to driver
17/05/17 10:30:56 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 446329
17/05/17 10:30:56 INFO executor.Executor: Running task 0.0 in stage 5667.0 (TID 446329)
17/05/17 10:30:56 INFO spark.MapOutputTrackerWorker: Updating epoch to 3116 and clearing cache
17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 6807
17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807_piece0 stored as bytes in memory (estimated size 13.1 KB, free 4.1 GB)
17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Reading broadcast variable 6807 took 4 ms
17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807 stored as values in m
We can see that the log statement differ with the exact timeout duration.
Our consumer config is below.
17/05/17 12:33:13 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1171dde4
17/05/17 12:33:13 INFO consumer.ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [xxxxx.xxx.xxx:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 50000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = default1
retry.backoff.ms = 100
ssl.secure.random.implementation = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest