Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.4.0
-
None
-
None
Description
We had a requirement to write Custom org.apache.kafka.clients.producer.Partitioner to use with Kafka Data Source available with package "spark-sql-kafka-0-10_2.11 "
Ideally, properties set as part of Producer are available to Partitioner method -
configure([Map|https://docs.oracle.com/javase/8/docs/api/java/util/Map.html?is-external=true]<String,?> configs)
But, we realized that Custom properties passed as options to Kafka format DataFrameWriter are not available to Partitioner whether we append that property with literal "kafka." or not.
Only, Configs listed on - https://kafka.apache.org/documentation/#producerconfigs were passed to Partitioner. But, in some cases it is required to pass custom properties for initialization of Partitioner.
Thus, there should be provision to set custom properties as options with Kafka Data Source not just producer configs. Otherwise, custom partitioner can't be initialized and implemented as per need.
For example -
df.write.format("kafka").option("Kafka.customproperty1", "value1").option("kafka.partitioner.class", "com.mycustom.ipartitioner")
..
...
.....
package com.mycustom;
import org.apache.kafka.clients.producer.Partitioner;
public class ipartitioner implemets Partitioner{
@override
public void configure(Map<String,?> configs){
system.out.println(configs) // customproperty1 is missing here which should be available.
}
...
...
}