With Lateral and Unnest if Sort is present in the sub-query, then it needs to handle the EMIT outcome correctly. This means when a EMIT is received then perform the Sort operation on the records buffered so far and produce output with it. After EMIT Sort should refresh it's state and again work on next batches of incoming record unless an EMIT is seen again.
For first cut Sort will not support spilling in the subquery between Lateral and Unnest since spilling is very unlikely. The worst case that can happen is that Lateral will get a batch with only 1 row of data because of repeated type column data size being too big. In that case Unnest will produce 1 output batch only and Sort or other blocking operators anyways needs enough memory to at least hold 1 incoming batch. So in ideal cases spilling should not happen. But if there is a operator between Sort and Unnest which increases the data size then Sort might be in a situation to spill but thats not a common case for now.
Description of Changes:
Currently the sort operator is implemented in below way. This is to provide general high level working of Sort and how EMIT support was implemented.
1) In buildSchema phase SORT creates an empty container with SV NONE and sends that downstream.
2) Post buildSchema phase it goes into a LOAD and keeps calling next() on upstream until it sees NONE or there is a failure.
3) Each batch which it receives it applies SV2 on them if it already doesn't have it and sort them and then buffers the batch after converting it into something called BatchGroup.InputBatch.
4) During buffering it looks for memory pressure and spill as needed.
5) Once all the batches are received and it gets None from upstream, it starts a merge phase.
6) In Merge phase it check if the merge can happen in memory or spilling is needed and perform the merge accordingly.
7) Sort has a concept of SortResults which represents different kinds of output container that sort can generate based on input batches and memory conditions. For example if it's an in-memory merge then output container of sort is SV4 container with SortResults of type MergeSortWrapper. If its spill and merge then container is of SV_NONE type with SortResults as BatchMerger. There are SortResults type for empty and single batches (not used anywhere).
8) SortResults basically provides an abstraction such that it provides output result with each next call to it backed by output container of the ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. So for example: in case of MergeSortWrapper all the inputs are in memory and hence all output is also in memory backed by SV4. For each next call basically SV4 is updated with the start index and length which informs called about record boundary that it should consume. For BatchMerger based on memory pressure and number of record Sort can output with each output container, it fills the output container with that many records and sends downstream.
9) Also the abstraction of SortResults is such that at beginning of MergePhase output container which is held by SortResults is cleared off and later re-initialized after merge is completed.
Now in current condition since SORT is a blocking operator it was clearing the output container ValueVectors post buildSchema phase and in load phase. And later it create the final output container (with ValueVector objects )after it has seen all the incoming data. The very first output batch is always returned with OK_NEW_SCHEMA such that downstream operator can setup the correct SV mode and schema with the first output batch, since schema returned in buildSchema phase was a dummy one. So the vector references maintained by downstream operator in buildSchema phase is updated with vector references in the first output batch.
With EMIT however SORT will go into load phase multiple times and hence we cannot clear off the output container of Sort after each EMIT boundary. If we do that then downstream operator which is holding references to ExternalSort output container ValueVector will become invalid and Sort also has to send OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that we need to make sure that with each EMIT boundary, sort will go from load to merge phase to produce output. And for each of these phase there is no clearing of output container is happening.
To achieve above and keep the code changes to minimum following method is followed:
1) A wrapper output container (outputWrapperContainer) and wrapper SV4 (outputSV4) is created.
2) This outputWrapperContainer is provided to the SortResults instead of the actual container which carries the output of ExternalSortBatch.
3) For each EMIT boundary the Load and Merge phase will happen and mostly the data for EMIT subquery is expected to be small so mostly sort will happen in memory.
4) During each merge phase across EMIT boundary the wrapper container will be created with fresh set of vectors as it does earlier. Basically clearing off the container during start of merge phase and re-creating the container with fresh sets of vector.
5) Once the Merge phase is done then prepareOutputContainer is called which basically calls the new method updateOutputContainer of SortResult.
6) Since we are not supporting spilling with EMIT hence in updateOutputContainer of BatchMerger, outcome is checked and an exception is thrown as UnsupportedOperation. Whereas for MergeSortWrapper::updateOutputContainer it creates the vector in sort actual output container for the very first output with vectors from SortResults outputWrapperContainer. For other output batches which are returned across EMIT batches the same output container of Sort is used with same ValueVectors only the data is transferred in from outputWrapperContainer of SortResults.
7) There is a new flag called firstBatchOfSchema which basically if set to true indicates that this is the first output batch of this schema. So in scenarios like schema change this flag will be set to true and first output container will be populated with fresh set of ValueVectors and returned downstream with OK_New_Schema flag.
8) There are challenges like what IterOutcome to return with each output batch. For example: with first very EMIT boundary Sort has to return OK_NEW_SCHEMA along with EMIT outcome. And for subsequent EMIT boundary if output spans across multiple batches then all the output batches should be returned with OK except the last output batch for this EMIT boundary which will be returned with EMIT. All these cases are handled in a single method getFinalOutcome which basically returns correct IterOutcome to return with each output batch.
9) Once EMIT is returned for current input EMIT boundary then Sort state is reset which involves clearing off the data structures and setting Sort state correctly such that subsequent next() call is handled properly. Inside this resetSortState now lies the logic to not clear off the memory of very last output batch with is sent with NONE outcome because of dependency of StreamAgg.
If we want to support handling spilling scenarios with EMIT outcome then that can also be achieved with current design. Basically BatchMerger::updateOutputContainer will have a logic to populate the actual output container of Sort with SV4 type container instead of SV None container which BatchMerger currently returns. With EMIT boundary and spilling happening for some boundary and in-memory for others we cannot send 2 different types of container from Sort which is SV_NONE for spilling and SV4 for in-memory merge. If we do that then OK_NEW_SCHEMA needs to be send with each output batch since the vector references has to be changed for downstream. Instead we can always produce an output container of SV4 type, such that in case of spilling across an EMIT boundary same output container with same ValueVectors references will be populated with new data. Hence no OK_NEW_SCHEMA needs to be sent downstream. There is one disadvantage of this approach which is now if Sort is send SV4 container instead of SV None in case of spilling as well then somewhere in downstream RemovingRecordBatch will have to again copy the data from SV4 to create a regular SV None container.