Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
None
-
None
Description
The first intention of this issue was to refactor InMemoryScan to use an iterator to make it more flexible:
Currently, InMemoryScan takes a Vec<Vec<RecordBatch>> as data.
- the outer Vec separates the partitions
- the inner Vec contains all the RecordBatch for one partition
The inner Vec is then converted into an iterator when the LogicalPlan is turned into a PhysicalPlan.I suggest that InMemoryScan should take Vec<Iter<RecordBatch>>. This would make it possible to plug custom Scan implementations into datafusion without the need to read them entirely into memory. It would still work pretty seamlessly with Vec<Vec<RecordBatch>> that would just need a to be converted with data.map(|x| x.iter()) first.
After further inspection (see discussion below), it seems more appropriate to completely refactor the way scan nodes are organized. The idea is to replace all specific XxxScan nodes with a generic SourceScan node:
/// A node that generates source data LogicalPlan::SourceScan { /// A shared reference to the source implementation scanner: Arc<dyn SourceScanner>, },
with:
#[async_trait] /// A scanner implementation that can be used by datafusion pub trait SourceScanner: Send + Sync + fmt::Debug { /// reference to the schema of the data as it will be read by this scanner fn projected_schema(&self) -> &SchemaRef; /// string display of this scanner fn format(&self) -> &str; /// apply projection on this scanner fn project( &self, required_columns: &HashSet<String>, has_projection: bool, ) -> Result<Arc<dyn SourceScanner>>; /// get scanner partitioning fn output_partitioning(&self) -> Partitioning; /// get iterator for a given partition async fn execute(&self, partition: usize) -> Result<Box<dyn RecordBatchReader + Send>>; }
The current specific implementations of scanner will then be provided by implementations of SourceScanner.
Attachments
Issue Links
- links to