Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.0
-
None
Description
Trying to read data from a secured Kafka cluster using spark structured
streaming. Also, using the below library to read the data -
"spark-sql-kafka-0-10_2.12":"3.0.0-preview" since it has the feature to
specify our custom group id (instead of spark setting its own custom group
id)
Dependency used in code:
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.0.0-preview</version>
Logs:
Getting the below error - even after specifying the required JAAS
configuration in spark options.
Caused by: java.lang.IllegalArgumentException: requirement failed:
Delegation token must exist for this connector. at
scala.Predef$.require(Predef.scala:281) at
org.apache.spark.kafka010.KafkaTokenUtil$.isConnectorUsingCurrentToken(KafkaTokenUtil.scala:299)
at
org.apache.spark.sql.kafka010.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:533)
at
org.apache.spark.sql.kafka010.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:275)
Spark configuration used to read from Kafka:
val kafkaDF = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootStrapServer)
.option("subscribe", kafkaTopic )
//Setting JAAS Configuration
.option("kafka.sasl.jaas.config", KAFKA_JAAS_SASL)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
// Setting custom consumer group id
.option("kafka.group.id", "test_cg")
.load()
Following document specifies that we can disable the feature of obtaining
delegation token -
https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html
Tried setting this property spark.security.credentials.kafka.enabled to
false in spark config, but it is still failing with the same error.
Attachments
Issue Links
- links to