Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-10368

[Rust][DataFusion] Refactor scan nodes to allow extensions

    XMLWordPrintableJSON

Details

    Description

      The first intention of this issue was to refactor InMemoryScan to use an iterator to make it more flexible:

      Currently, InMemoryScan takes a Vec<Vec<RecordBatch>> as data.

      • the outer Vec separates the partitions
      • the inner Vec contains all the RecordBatch for one partition
        The inner Vec is then converted into an iterator when the LogicalPlan is turned into a PhysicalPlan.

      I suggest that InMemoryScan should take Vec<Iter<RecordBatch>>. This would make it possible to plug custom Scan implementations into datafusion without the need to read them entirely into memory. It would still work pretty seamlessly with Vec<Vec<RecordBatch>> that would just need a to be converted with data.map(|x| x.iter()) first.

      After further inspection (see discussion below), it seems more appropriate to completely refactor the way scan nodes are organized. The idea is to replace all specific XxxScan nodes with a generic SourceScan node:

      /// A node that generates source data
      LogicalPlan::SourceScan {
          /// A shared reference to the source implementation
          scanner: Arc<dyn SourceScanner>,
      },
      

      with:

      #[async_trait]
      /// A scanner implementation that can be used by datafusion
      pub trait SourceScanner: Send + Sync + fmt::Debug {
      
        /// reference to the schema of the data as it will be read by this scanner
        fn projected_schema(&self) -> &SchemaRef;
      
        /// string display of this scanner
        fn format(&self) -> &str;
      
        /// apply projection on this scanner
        fn project(
          &self,
          required_columns: &HashSet<String>,
          has_projection: bool,
        ) -> Result<Arc<dyn SourceScanner>>;
      
        /// get scanner partitioning
        fn output_partitioning(&self) -> Partitioning;
      
        /// get iterator for a given partition
        async fn execute(&self, partition: usize) -> Result<Box<dyn RecordBatchReader + Send>>;
      
      }
      

      The current specific implementations of scanner will then be provided by implementations of SourceScanner.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              rdettai RĂ©mi Dettai
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m