Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27396

SPIP: Public APIs for extended Columnar Processing Support

    XMLWordPrintableJSON

Details

    • Epic
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • None
    • SQL
    • None
    • Public APIs for extended Columnar Processing Support

    Description

      strong textSPIP: Columnar Processing Without Arrow Formatting Guarantees.

       

      Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.

      The Dataset/DataFrame API in Spark currently only exposes to users one row at a time when processing data.  The goals of this are to

      1. Add to the current sql extensions mechanism so advanced users can have access to the physical SparkPlan and manipulate it to provide columnar processing for existing operators, including shuffle.  This will allow them to implement their own cost based optimizers to decide when processing should be columnar and when it should not.
      2. Make any transitions between the columnar memory layout and a row based layout transparent to the users so operations that are not columnar see the data as rows, and operations that are columnar see the data as columns.

       

      Not Requirements, but things that would be nice to have.

      1. Transition the existing in memory columnar layouts to be compatible with Apache Arrow.  This would make the transformations to Apache Arrow format a no-op. The existing formats are already very close to those layouts in many cases.  This would not be using the Apache Arrow java library, but instead being compatible with the memory layout and possibly only a subset of that layout.

       

      Q2. What problem is this proposal NOT designed to solve?

      The goal of this is not for ML/AI but to provide APIs for accelerated computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already have several mechanisms to get data into/out of them. These can be improved but will be covered in a separate SPIP.

      This is not trying to implement any of the processing itself in a columnar way, with the exception of examples for documentation.

      This does not cover exposing the underlying format of the data.  The only way to get at the data in a ColumnVector is through the public APIs.  Exposing the underlying format to improve efficiency will be covered in a separate SPIP.

      This is not trying to implement new ways of transferring data to external ML/AI applications.  That is covered by separate SPIPs already.

      This is not trying to add in generic code generation for columnar processing.  Currently code generation for columnar processing is only supported when translating columns to rows.  We will continue to support this, but will not extend it as a general solution. That will be covered in a separate SPIP if we find it is helpful.  For now columnar processing will be interpreted.

      This is not trying to expose a way to get columnar data into Spark through DataSource V2 or any other similar API.  That would be covered by a separate SPIP if we find it is needed.

       

      Q3. How is it done today, and what are the limits of current practice?

      The current columnar support is limited to 3 areas.

      1. Internal implementations of FileFormats, optionally can return a ColumnarBatch instead of rows.  The code generation phase knows how to take that columnar data and iterate through it as rows for stages that wants rows, which currently is almost everything.  The limitations here are mostly implementation specific. The current standard is to abuse Scala’s type erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The code generation can handle this because it is generating java code, so it bypasses scala’s type checking and just casts the InternalRow to the desired ColumnarBatch.  This makes it difficult for others to implement the same functionality for different processing because they can only do it through code generation. There really is no clean separate path in the code generation for columnar vs row based. Additionally, because it is only supported through code generation if for any reason code generation would fail there is no backup.  This is typically fine for input formats but can be problematic when we get into more extensive processing.
      2. When caching data it can optionally be cached in a columnar format if the input is also columnar.  This is similar to the first area and has the same limitations because the cache acts as an input, but it is the only piece of code that also consumes columnar data as an input.
      3. Pandas vectorized processing.  To be able to support Pandas UDFs Spark will build up a batch of data and send it to python for processing, and then get a batch of data back as a result.  The format of the data being sent to python can either be pickle, which is the default, or optionally Arrow. The result returned is the same format. The limitations here really are around performance.  Transforming the data back and forth can be very expensive.

       

      Q4. What is new in your approach and why do you think it will be successful?

      What we are primarily doing is cleaning up a lot of existing functionality, refactoring it, and making it more generic. We think we can be successful because we have already completed a proof of concept that shows columnar processing can be efficiently done in Spark.

       

      Q5. Who cares? If you are successful, what difference will it make?

      Anyone who wants to accelerate spark.  At Spark+AI summit this year, 2019, I spoke with multiple companies (7 by my count including Facebook) trying to do this, either using FPGAs, GPUs, or CPU SIMD instructions to get faster more efficient processing.  This will help all of them to provide a clean implementation of accelerated ETL processing, without hacks like overriding internal spark classes by putting jars first on the classpath, which many of these companies are currently doing.

       

      Q6. What are the risks?

      Technologically I don’t see many risks.  We have done a proof of concept implementation that shows it can be done, it is just a matter of putting those changes in place.

       

      Q7. How long will it take?

      I suspect that we can put together a patch with tests in a month. Adding documentation and iterating on the APIs I would suspect would put it at a month and a half to two months. So one quarter would give us enough time to probably get through everything.

       

      Q8. What are the mid-term and final “exams” to check for success?

      The first check for success would be to successfully transition the existing columnar code over to using this behind the scenes. That would be separating out the code that goes from rows to columns when sending the data to python for processing by Pandas UDFs, and also the code that goes from columns to rows for the file formats, caching, and the output of Pandas UDFs.

       

      The final success is really about adoption and seeing if the follow on work that we, and hopefully others, have shows that it cleanly enables something that was not possible before.

       

      Appendix A:

      For the most part the APIs will not need to change in any backwards incompatible way.  Also note that these are not necessarily final changes to the APIs, but mostly reflect the general direction that we want to go in, so there is no need to include nits in the discussion on the APIs.  Those can be covered by code reviews.

       

      ColumnarBatch and ColumnVector will need to move from one jar to another to allow the catalyst Expression class to have access to them.

       

      Expression will get added to it.

       

       /**
        * Returns true if this expression supports columnar processing through [[columnarEval]].
        */
       def supportsColumnar: Boolean = false
      
       /**
        * Returns the result of evaluating this expression on the entire ColumnarBatch. The result of
        * calling this may be a single ColumnVector, or a scalar value. Scalar values can happen if they
        * are a part of the expression or in some cases may be an optimization, like using the batch's
        * null count to know is isNull is false for the entire batch without doing any calculations.
        */
       def columnarEval(batch: ColumnarBatch): Any = {
         throw new IllegalStateException(s"Internal Error ${this.getClass} has column support mismatch")
       }
      

       

      SparkPlan will be updated to include

       /**
        * Return true if this stage of the plan supports columnar execution.
        */
       def supportsColumnar: Boolean = false
      
       /**
        * The exact types of the columns that are output in columnar processing mode (used for faster codegen of transitions from columns to rows).
        */
       def vectorTypes: Option[Seq[String]] = None
      
       /**
        * Returns the result of this query as an RDD[ColumnarBatch] by delegating to `doColumnarExecute`
        * after preparations.
        *
        * Concrete implementations of SparkPlan should override `doColumnarExecute` if `supportsColumnar`
        * returns true.
        */
       final def executeColumnar(): RDD[ColumnarBatch] = executeQuery {
          if (isCanonicalizedPlan) {
              throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
          }
          doExecuteColumnar()
       }
      
       /**
        * Produces the result of the query as an `RDD[ColumnarBatch]` if [[supportsColumnar]] returns
        * true.
        */
       protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
          // If the user updates supportsColumnar, but not this blow up.
          throw new IllegalStateException(s"Internal Error ${this.getClass} has column support" +
          s" mismatch:\n${this}")
       }
      

      In BufferedRowIterator init will change to reflect that the values in it could be ColumnarBatches too, which is the case today.

      -  public abstract void init(int index, Iterator<InternalRow>[] iters);
      +  public abstract void init(int index, Iterator<Object>[] iters);
      

       
      SparkSessionExtensions will have new APIs for columnar processing.

      type ColumnarRuleBuilder = SparkSession => ColumnarRule
      
      def injectColumnar(builder: ColumnarRuleBuilder): Unit
      
      /**
       * :: Experimental ::
       * Holds a rule that run prior to inserting column to row and row to column transitions to
       * allow for injecting a columnar implementation into various operators, and a rule that
       * runs after to allow overriding the implementation of those transitions, and potentially
       * cleaning up the plan (like inserting batching of columnar data for more efficient processing).
       */
      @DeveloperApi
      @Experimental
      @Unstable
      class ColumnarRule {
        def pre(plan: SparkPlan): SparkPlan = plan
        def post(plan: SparkPlan): SparkPlan = plan
      }
      

      Attachments

        Issue Links

          Activity

            People

              revans2 Robert Joseph Evans
              revans2 Robert Joseph Evans
              Thomas Graves Thomas Graves
              Votes:
              3 Vote for this issue
              Watchers:
              49 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: