Uploaded image for project: 'Bahir'
  1. Bahir
  2. BAHIR-168

Kinesis support in Structured Streaming

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Open
    • Major
    • Resolution: Unresolved
    • Spark-2.3.0
    • None
    • None

    Description

      Implement Kinesis based sources and sinks for Structured Streaming

      Kinesis Sources

      I hope that this will be contributed to Apache Bahir, as commented in the SPARK-18165.

      Kinesis Sinks

      I've implemented a Sink here: https://github.com/shimamoto/bahir/tree/kinesis-writer/sql-kinesis
      This requires the Spark 2.3 and datasource v2 API. I plan on opening a PR, but Bahir hasn't supported Spark 2.3 yet. We need to handle BAHIR-167 first.

      A brief overview is listed below.

      Description

      Add a new Kinesis Sink and Kinesis Relation for writing streaming and batch queries, respectively, to AWS Kinesis.

      The Dataframe being written to Kinesis should have the following columns in schema:

      Column Type
      partitionKey (optional) string
      data (required) string or binary

      If the partition key column is missing, then a SHA-256 digest of data as a hex string will be automatically added.

      Streaming Kinesis Sink

      val df = inputStream.toDS().toDF("partitionKey", "data")
      
      val writer = df.writeStream
        .format("kinesis")
        .option("streamName", "test-stream")
        .option("region", "us-east-1")
        .option("checkpointLocation", checkpointDir.getCanonicalPath)
        .start()
      

      Batch Kinesis Sink

      val df = Seq("partitionKey-1" -> "data1", "partitionKey-2" -> "data2")
        .toDF("partitionKey", "data")
      
      df.write
        .format("kinesis")
        .option("streamName", "test-stream")
        .option("region", "us-east-1")
        .save()
      

      Configuration

      The following options must be set for both batch and streaming queries.

      Option value default meaning
      streamName string - The stream name associated with the Sink.
      region string - The region name for Kinesis Stream.

      The following configurations are optional.

      Option value default meaning
      chunksize int 50 Rate limit on maximum number of records processed per PutRecords request.
      endpoint string (none) Only use this if using a non-standard service endpoint.

      Attachments

        Activity

          People

            Unassigned Unassigned
            shimamoto Takako Shimamoto
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: