Affects Version/s: 3.0.0
Fix Version/s: None
Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.
The Dataset/DataFrame API in Spark currently only exposes to users one row at a time when processing data. The goals of this are to
- Expose to end users a new option of processing the data in a columnar format, multiple rows at a time, with the data organized into contiguous arrays in memory.
- Make any transitions between the columnar memory layout and a row based layout transparent to the end user.
- Allow for simple data exchange with other systems, DL/ML libraries, pandas, etc. by having clean APIs to transform the columnar data into an Apache Arrow compatible layout.
- Provide a plugin mechanism for columnar processing support so an advanced user could avoid data transition between columnar and row based processing even through shuffles. This means we should at least support pluggable APIs so an advanced end user can implement the columnar partitioning themselves, and provide the glue necessary to shuffle the data still in a columnar format.
- Expose new APIs that allow advanced users or frameworks to implement columnar processing either as UDFs, or by adjusting the physical plan to do columnar processing. If the latter is too controversial we can move it to another SPIP, but we plan to implement some accelerated computing in parallel with this feature to be sure the APIs work, and without this feature it makes that impossible.
Not Requirements, but things that would be nice to have.
- Provide default implementations for partitioning columnar data, so users don’t have to.
- Transition the existing in memory columnar layouts to be compatible with Apache Arrow. This would make the transformations to Apache Arrow format a no-op. The existing formats are already very close to those layouts in many cases. This would not be using the Apache Arrow java library, but instead being compatible with the memory layout and possibly only a subset of that layout.
- Provide a clean transition from the existing code to the new one. The existing APIs which are public but evolving are not that far off from what is being proposed. We should be able to create a new parallel API that can wrap the existing one. This means any file format that is trying to support columnar can still do so until we make a conscious decision to deprecate and then turn off the old APIs.
Q2. What problem is this proposal NOT designed to solve?
This is not trying to implement any of the processing itself in a columnar way, with the exception of examples for documentation, and possibly default implementations for partitioning of columnar shuffle.
Q3. How is it done today, and what are the limits of current practice?
The current columnar support is limited to 3 areas.
- Input formats, optionally can return a ColumnarBatch instead of rows. The code generation phase knows how to take that columnar data and iterate through it as rows for stages that wants rows, which currently is almost everything. The limitations here are mostly implementation specific. The current standard is to abuse Scala’s type erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The code generation can handle this because it is generating java code, so it bypasses scala’s type checking and just casts the InternalRow to the desired ColumnarBatch. This makes it difficult for others to implement the same functionality for different processing because they can only do it through code generation. There really is no clean separate path in the code generation for columnar vs row based. Additionally because it is only supported through code generation if for any reason code generation would fail there is no backup. This is typically fine for input formats but can be problematic when we get into more extensive processing.
- When caching data it can optionally be cached in a columnar format if the input is also columnar. This is similar to the first area and has the same limitations because the cache acts as an input, but it is the only piece of code that also consumes columnar data as an input.
- Pandas vectorized processing. To be able to support Pandas UDFs Spark will build up a batch of data and send it python for processing, and then get a batch of data back as a result. The format of the data being sent to python can either be pickle, which is the default, or optionally Arrow. The result returned is the same format. The limitations here really are around performance. Transforming the data back and forth can be very expensive.
Q4. What is new in your approach and why do you think it will be successful?
Conceptually what we are primarily doing is cleaning up a lot of existing functionality and getting it ready to be exposed as public facing APIs. We think we can be successful because we have already completed a proof of concept that shows columnar processing can be efficiently done in Spark.
Q5. Who cares? If you are successful, what difference will it make?
Anyone who wants to integrate Spark with other data processing systems will care, as it provides a cleaner, more standards-based way of transferring the data. Anyone who wants to experiment with accelerated computing, either on a CPU or GPU, will benefit as it provides a supported way to make this work instead of trying to hack something in that was not available before.
Q6. What are the risks?
Technologically I don’t see many risks. We have done a proof of concept implementation that shows it can be done. This biggest risks are if Apache Arrow loses favor as an interchange format between different systems. In the worst cast that means we may have to support transforming the data into other types, but because Arrow as the internal format is not a requirement and there really are not that many ways to lay out columnar data, it should require minor changes if anything.
The second risk is if we didn’t setup the API in a powerful/flexible enough way for users to really take full advantage of it. This is really going to come with time, and if we start off fairly conservatively in our API we can hopefully expand it in the future.
The final risk is around dictionaries. The current columnar layout exposes the dictionary as just a java class. When we go to an Arrow compatible layout the dictionary has a specific format. The Parquet APIs don’t give raw access to the layout of the dictionary. This is not something that we cannot overcome, it just means we may have less than optimal translations, in terms of performance, when going from Parquet formatted data to Arrow until we can get some changes into the Parquet API. This may be true of Orc as well. It is no worse off than it is today already.
Q7. How long will it take?
I suspect that we can put together a patch with tests in a month. Adding documentation and iterating on the APIs I would suspect would put it at a month and a half to two months. So one quarter would give us enough time to get through everything, including reviews if things go well.
Q8. What are the mid-term and final “exams” to check for success?
The first check for success would be to successfully transition the existing columnar code over to using this. That would be transforming and sending the data to python for processing by Pandas UDFs, and the file formats and caching code to use a cleaner more explicit columnar API.
The final success is really about adoption and seeing if the follow on work that we, and hopefully others, have shows that it cleanly enables something that was not possible before.