Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20462

Spark-Kinesis Direct Connector

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.0
    • None
    • DStreams
    • Hide
      My colleagues at AWS and I were discussing improvements to the existing Kinesis-Spark connector. We'd like to propose a new implementation of the Kinesis-Spark connector. The implementation mimics much of the functionality of the Kafka Direct Stream and is detailed below:
      Current Implementation
      The current implementation of the Kinesis Spark Connector starts with the creation of the initial block RDD through running the compute method on the driver. The driver retrieves a shard iterator and uses this iterator to make multiple getRecords calls. It discards the records and passes SequenceNumberRanges to the executors. Then, the executors make the getRecords requests again and passes the records to children RDD’s which execute any user defined transformations or actions.
       
      Problem of Current Implementation

          Checkpointing is currently handled through a time-based process run on a background thread. Data can be checkpointed as received from Kinesis without having been processed by a terminal action, resulting in a failure to process some data.
          The current implementation of the Kinesis receiver requires a long-running receiver process unnecessarily takes up a single core per worker per stream and can represent a significant performance bottleneck for customers who have multiple streams being processed by their Spark application. Additionally, making multiple getRecords calls in the driver as well as the executor immediately doubles both the processing latency and number of transactions consumed per shard.
          No integration with Spark’s backpressure implementation. This is especially troublesome for customers with cyclic changes in throughput or customers with a large number of downstream transformations (which implicitly add variability to the amount of time a batch takes to complete)
          Spark application has to be restarted in order to pick up shard-events such as splits and merges

       
      Proposed Implementation
       The Spark driver makes a request for a shard iterator using the last sequence number processed for a given shard as start and a timestamp that offsets from that sequence number for a time difference that equals that of the customer’s configured Spark batch interval. SequenceNumberRanges are passed to the executors for processing. If the RDD is a terminal RDD than checkpointing is done in the onComplete method. The user does not have to implement their own checkpointing logic and ensuring checkpointing only occurs after the terminal action of a user’s Spark job ensures that no data is lost. These checkpoints are recovered by the driver at the start of the next batch. If the driver retrieves a sentinel value indicating shard end, it issues a describe streams request to discover new shards that have just opened. Notification of resharding occurs during application runtime and there is no need for application restart.
       
      A Note on Backpressure
      We also want to introduce integration with Spark’s backpressure strategy. Customers can have two types of volatility in processing time of records

          Volatility caused by cyclical or stochastic changes in throughput
          Volatility caused by unpredictable changes in the amount of work a single record represents i.e. a customer can have the same number of records inputted to their Spark application and have high variance in the resultant processing time of the downstream actions depending on the content of the records
          Kinesis provides sequence numbers that always increase but do not do so monotonically. Therefore, if a customer does not have totally uniform put patterns over a stream, the number of records received in each time blocked batch could be radically different.

       
      The three problems outlined cause much the same behavior and can be corrected in much the same way. In a sense, the number of records doesn’t concern us as much as the total amount of work. While the total amount of work varies by how many records are present in a batch, it can also fluctuate due to other factors. What is important is regardless of what variable is causing the slowdown, that we continuously update the rate at which we’re receiving records until it converges on the ideal rate which maximizes consumption while minimizing batch failures.
       
      While there is a clear need for pending research and performance testing, I expect that while the method in which records are requested from Kinesis might cause more need for adjustment by the backpressure algorithm, it will still converge to an ideal rate.
      Problems Solved by Proposed Implementation

          Allow users to create a DStream with a map containing the last sequence number processed keyed by shard
          Removes the dependency on DynamoDB by allowing Spark checkpointing to directly integrate with barebones Kinesis getRecords requests. This also allows users to guarantee at-most-once, at-least-once and exactly once processing of records depending on their needs.
          Integration with Spark’s backpressure mechanism (which enables other features like Spark auto-scaling drastically increasing overall elasticity of infrastructure)
          Removes the necessity of a long-running receiver task- all getRecords calls are made directly by each executor and early completion of processing a given shard allows for the executor to free up so that it might perform other tasks
          Allows for a Spark application to handle dynamic resharding, Spark parallelism directly maps to Kinesis parallelism
      Show
      My colleagues at AWS and I were discussing improvements to the existing Kinesis-Spark connector. We'd like to propose a new implementation of the Kinesis-Spark connector. The implementation mimics much of the functionality of the Kafka Direct Stream and is detailed below: Current Implementation The current implementation of the Kinesis Spark Connector starts with the creation of the initial block RDD through running the compute method on the driver. The driver retrieves a shard iterator and uses this iterator to make multiple getRecords calls. It discards the records and passes SequenceNumberRanges to the executors. Then, the executors make the getRecords requests again and passes the records to children RDD’s which execute any user defined transformations or actions.   Problem of Current Implementation     Checkpointing is currently handled through a time-based process run on a background thread. Data can be checkpointed as received from Kinesis without having been processed by a terminal action, resulting in a failure to process some data.     The current implementation of the Kinesis receiver requires a long-running receiver process unnecessarily takes up a single core per worker per stream and can represent a significant performance bottleneck for customers who have multiple streams being processed by their Spark application. Additionally, making multiple getRecords calls in the driver as well as the executor immediately doubles both the processing latency and number of transactions consumed per shard.     No integration with Spark’s backpressure implementation. This is especially troublesome for customers with cyclic changes in throughput or customers with a large number of downstream transformations (which implicitly add variability to the amount of time a batch takes to complete)     Spark application has to be restarted in order to pick up shard-events such as splits and merges   Proposed Implementation  The Spark driver makes a request for a shard iterator using the last sequence number processed for a given shard as start and a timestamp that offsets from that sequence number for a time difference that equals that of the customer’s configured Spark batch interval. SequenceNumberRanges are passed to the executors for processing. If the RDD is a terminal RDD than checkpointing is done in the onComplete method. The user does not have to implement their own checkpointing logic and ensuring checkpointing only occurs after the terminal action of a user’s Spark job ensures that no data is lost. These checkpoints are recovered by the driver at the start of the next batch. If the driver retrieves a sentinel value indicating shard end, it issues a describe streams request to discover new shards that have just opened. Notification of resharding occurs during application runtime and there is no need for application restart.   A Note on Backpressure We also want to introduce integration with Spark’s backpressure strategy. Customers can have two types of volatility in processing time of records     Volatility caused by cyclical or stochastic changes in throughput     Volatility caused by unpredictable changes in the amount of work a single record represents i.e. a customer can have the same number of records inputted to their Spark application and have high variance in the resultant processing time of the downstream actions depending on the content of the records     Kinesis provides sequence numbers that always increase but do not do so monotonically. Therefore, if a customer does not have totally uniform put patterns over a stream, the number of records received in each time blocked batch could be radically different.   The three problems outlined cause much the same behavior and can be corrected in much the same way. In a sense, the number of records doesn’t concern us as much as the total amount of work. While the total amount of work varies by how many records are present in a batch, it can also fluctuate due to other factors. What is important is regardless of what variable is causing the slowdown, that we continuously update the rate at which we’re receiving records until it converges on the ideal rate which maximizes consumption while minimizing batch failures.   While there is a clear need for pending research and performance testing, I expect that while the method in which records are requested from Kinesis might cause more need for adjustment by the backpressure algorithm, it will still converge to an ideal rate. Problems Solved by Proposed Implementation     Allow users to create a DStream with a map containing the last sequence number processed keyed by shard     Removes the dependency on DynamoDB by allowing Spark checkpointing to directly integrate with barebones Kinesis getRecords requests. This also allows users to guarantee at-most-once, at-least-once and exactly once processing of records depending on their needs.     Integration with Spark’s backpressure mechanism (which enables other features like Spark auto-scaling drastically increasing overall elasticity of infrastructure)     Removes the necessity of a long-running receiver task- all getRecords calls are made directly by each executor and early completion of processing a given shard allows for the executor to free up so that it might perform other tasks     Allows for a Spark application to handle dynamic resharding, Spark parallelism directly maps to Kinesis parallelism

    Description

      I'd like to propose and the vet the design for a direct connector between Spark and Kinesis.

      Attachments

        Activity

          People

            Unassigned Unassigned
            lauren.moos Lauren Moos
            Votes:
            8 Vote for this issue
            Watchers:
            16 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: