Details
-
New Feature
-
Status: Open
-
Major
-
Resolution: Unresolved
-
Spark-2.3.0
-
None
-
None
Description
Implement Kinesis based sources and sinks for Structured Streaming
Kinesis Sources
I hope that this will be contributed to Apache Bahir, as commented in the SPARK-18165.
Kinesis Sinks
I've implemented a Sink here: https://github.com/shimamoto/bahir/tree/kinesis-writer/sql-kinesis
This requires the Spark 2.3 and datasource v2 API. I plan on opening a PR, but Bahir hasn't supported Spark 2.3 yet. We need to handle BAHIR-167 first.
A brief overview is listed below.
Description
Add a new Kinesis Sink and Kinesis Relation for writing streaming and batch queries, respectively, to AWS Kinesis.
The Dataframe being written to Kinesis should have the following columns in schema:
Column | Type |
---|---|
partitionKey (optional) | string |
data (required) | string or binary |
If the partition key column is missing, then a SHA-256 digest of data as a hex string will be automatically added.
Streaming Kinesis Sink
val df = inputStream.toDS().toDF("partitionKey", "data") val writer = df.writeStream .format("kinesis") .option("streamName", "test-stream") .option("region", "us-east-1") .option("checkpointLocation", checkpointDir.getCanonicalPath) .start()
Batch Kinesis Sink
val df = Seq("partitionKey-1" -> "data1", "partitionKey-2" -> "data2") .toDF("partitionKey", "data") df.write .format("kinesis") .option("streamName", "test-stream") .option("region", "us-east-1") .save()
Configuration
The following options must be set for both batch and streaming queries.
Option | value | default | meaning |
---|---|---|---|
streamName | string | - | The stream name associated with the Sink. |
region | string | - | The region name for Kinesis Stream. |
The following configurations are optional.
Option | value | default | meaning |
---|---|---|---|
chunksize | int | 50 | Rate limit on maximum number of records processed per PutRecords request. |
endpoint | string | (none) | Only use this if using a non-standard service endpoint. |