Details
-
Question
-
Status: Resolved
-
Minor
-
Resolution: Invalid
-
2.3.2
-
None
-
None
-
local mode
Description
public class MyDatasourceRelation extends BaseRelation implements PrunedFilteredScan { Map<String, String> parameters; SparkSession sparkSession; CombinedReportHelper helper; public MyDatasourceRelation() { } public MyDatasourceRelation (SQLContext sqlContext,Map<String, String> parameters) { this.parameters = parameters; this.sparkSession = sqlContext.sparkSession(); this.helper = new CombinedReportHelper(parameters); //don't care this.helper.setRowsPerPage(1); } @Override public SQLContext sqlContext() { return this.sparkSession.sqlContext(); } @Override public StructType schema() { StructType structType = transformSchema(helper.getFields(), helper.getFirst()); //helper.close(); System.out.println("get schema: "+structType); return structType; } @Override public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) { System.out.println("build scan:"); int totalRow = helper.getTotalRow(); Partition[] partitions = getPartitions(totalRow, parameters); System.out.println("get partition:"+partitions.length+" total row:"+totalRow); return new SmartbixDatasourceRDD(sparkSession.sparkContext(), partitions, parameters); } private Partition[] getPartitions(int totalRow, Map<String, String> parameters) { int step = 1000000; int numOfPartitions = (totalRow + step - 1) / step; Partition[] partitions = new Partition[numOfPartitions]; for (int i = 0; i < numOfPartitions; i++) { int start = i * step + 1; partitions[i] = new MyPartition(null, i, start, start + step); } return partitions; } }
---------- above is my code,some useless information are removed -----------------------
trait PrunedFilteredScan
{ def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] }
if i implement this trait, i find requiredColumns param is different everytime,Why are the order different????
you can use spark.read.jdbc and connect to your local mysql DB, and debug at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation#buildScan(scala:130)
to show this param;
attachement is my screenshot