Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25723

spark sql External DataSource question

    XMLWordPrintableJSON

Details

    • Question
    • Status: Resolved
    • Minor
    • Resolution: Invalid
    • 2.3.2
    • None
    • SQL
    • None
    • local mode

    Description

      public class MyDatasourceRelation extends BaseRelation implements PrunedFilteredScan {
      Map<String, String> parameters;
      SparkSession sparkSession;
      CombinedReportHelper helper;
      
      public MyDatasourceRelation() {
      
      }
      
      public MyDatasourceRelation (SQLContext sqlContext,Map<String, String> parameters) {
      this.parameters = parameters;
      this.sparkSession = sqlContext.sparkSession();
      this.helper = new CombinedReportHelper(parameters); //don't care 
      this.helper.setRowsPerPage(1);
      }
      
      
      @Override
      public SQLContext sqlContext() {
      return this.sparkSession.sqlContext();
      }
      
      @Override
      public StructType schema() {
      StructType structType = transformSchema(helper.getFields(), helper.getFirst());
      //helper.close();
      System.out.println("get schema: "+structType);
      return structType;
      }
      
      
      
      @Override
      public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) {
      System.out.println("build scan:");
      int totalRow = helper.getTotalRow();
      Partition[] partitions = getPartitions(totalRow, parameters);
      System.out.println("get partition:"+partitions.length+" total row:"+totalRow);
      return new SmartbixDatasourceRDD(sparkSession.sparkContext(), partitions, parameters);
      }
      
      
      private Partition[] getPartitions(int totalRow, Map<String, String> parameters) {
      int step = 1000000;
      int numOfPartitions = (totalRow + step - 1) / step;
      
      Partition[] partitions = new Partition[numOfPartitions];
      
      
      for (int i = 0; i < numOfPartitions; i++) {
      int start = i * step + 1;
      partitions[i] = new MyPartition(null, i, start, start + step);
      }
      return partitions;
      
      }
      }
      

       

       

       

      ---------- above is my code,some useless information are removed -----------------------

       

       

      trait PrunedFilteredScan

      { def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] }

       

      if i implement this trait, i find requiredColumns param is different everytime,Why are the order different????

      you can use spark.read.jdbc  and connect to your local mysql DB, and debug at 

      org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation#buildScan(scala:130)

      to show this param;

      attachement is my screenshot 

      Attachments

        1. QQ图片20181015182502.jpg
          118 kB
          huanghuai

        Activity

          People

            Unassigned Unassigned
            zhiyin1233 huanghuai
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: