Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.3.2
-
None
Description
In case of vectorized DSv2 readers i.e. if it implements SupportsScanColumnarBatch and number of columns is around(or greater than) 1000 then it throws
Caused by: org.codehaus.janino.InternalCompilerException: Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage0" grows beyond 64 KB at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:990) at org.codehaus.janino.CodeContext.write(CodeContext.java:899) at org.codehaus.janino.CodeContext.writeBranch(CodeContext.java:1016) at org.codehaus.janino.UnitCompiler.writeBranch(UnitCompiler.java:11911) at org.codehaus.janino.UnitCompiler.compileBoolean2(UnitCompiler.java:3675) at org.codehaus.janino.UnitCompiler.access$5500(UnitCompiler.java:212)
I can see from logs that it tries to disable Whole-stage codegen but it's failing even after that on each retry.
19/10/07 20:49:35 WARN WholeStageCodegenExec: Whole-stage codegen disabled for plan (id=0):
*(0) DataSourceV2Scan [column_0#3558, column_1#3559, column_2#3560, column_3#3561, column_4#3562, column_5#3563, column_6#3564, column_7#3565, column_8#3566, column_9#3567, column_10#3568, column_11#3569, column_12#3570, column_13#3571, column_14#3572, column_15#3573, column_16#3574, column_17#3575, column_18#3576, column_19#3577, column_20#3578, column_21#3579, column_22#3580, column_23#3581, ... 976 more fields], com.shubham.reader.MyDataSourceReader@5c7673b8
Repro code for a simple reader can be:
public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch { private StructType schema; private int numCols = 10; private int numRows = 10; private int numReaders = 1; public MyDataSourceReader(Map<String, String> options) { initOptions(options); System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this); } private void initOptions(Map<String, String> options) { String numColumns = options.get("num_columns"); if (numColumns != null) { numCols = Integer.parseInt(numColumns); } String numRowsOption = options.get("num_rows_per_reader"); if (numRowsOption != null) { numRows = Integer.parseInt(numRowsOption); } String readersOption = options.get("num_readers"); if (readersOption != null) { numReaders = Integer.parseInt(readersOption); } } @Override public StructType readSchema() { final String colPrefix = "column_"; StructField[] fields = new StructField[numCols]; for (int i = 0; i < numCols; i++) { fields[i] = new StructField(colPrefix + i, DataTypes.IntegerType, true, Metadata.empty()); } schema = new StructType(fields); return schema; } @Override public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() { System.out.println("MyDataSourceReader.createDataReaderFactories: " + numReaders); return new ArrayList<>(); } }
If I pass num_columns 1000 or greater, the issue appears.
spark.read.format("com.shubham.MyDataSource").option("num_columns", "1000").option("num_rows_per_reader", 2).option("num_readers", 1).load.show
Any fixes/workarounds for this?
SPARK-16845 and SPARK-17092 are resolved but looks like they don't deal with the vectorized part.