Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
Spark-2.4.0
-
None
Description
While trying to persist Oracle's Number(16,9) using sql-streaming-jdbc, getting below error.
Java Code Snippet:
Dataset<Row> rawData = spark.readStream()
.format("csv").schema(getSchema())
.csv("/rates-streaming/*.csv");
rawData.createOrReplaceTempView("testRates");
rawData.printSchema();
results.printSchema();
Dataset<Row> results = spark.sql("select rate from testRates ");
StreamingQuery query = results.writeStream().outputMode("append").format("streaming-jdbc")
.outputMode(OutputMode.Append()).option(JDBCOptions.JDBC_URL(), datasource)
.option(JDBCOptions.JDBC_TABLE_NAME(), "TABLENAME")
.option(JDBCOptions.JDBC_DRIVER_CLASS(), "oracle.jdbc.driver.OracleDriver")
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE(), "100").option("user", username)
.option("password", password).option("truncate", false).trigger(Trigger.ProcessingTime("15 seconds")).start();
query.awaitTermination();
public StructType getSchema() {
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("rate", DataTypes.createDecimalType(16, 9), true));
StructType schema = DataTypes.createStructType(fields);
return schema;
{{ }}}
Logs:
java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast to java.math.BigDecimal
at org.apache.spark.sql.Row.getDecimal(Row.scala:262) ~[spark-catalyst_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.sql.Row.getDecimal$(Row.scala:262) ~[spark-catalyst_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.sql.catalyst.expressions.GenericRow.getDecimal(rows.scala:166) ~[spark-catalyst_2.12-2.4.0.jar:2.4.0]
at org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12(JdbcUtil.scala:102) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
at org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12$adapted(JdbcUtil.scala:101) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
at org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.doWriteAndResetBuffer(JdbcStreamWriter.scala:174) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
at org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:156) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
at org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:79) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118) ~[spark-sql_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116) ~[spark-sql_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67) ~[spark-sql_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.scheduler.Task.run(Task.scala:121) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) ~[spark-core_2.12-2.4.0.jar:2.4.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_291]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_291]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]