Uploaded image for project: 'Bahir (Retired)'
  1. Bahir (Retired)
  2. BAHIR-301

Unable to persist Oracle's Number(16,9) using sql-streaming-jdbc

    XMLWordPrintableJSON

Details

    Description

      While trying to persist Oracle's Number(16,9) using sql-streaming-jdbc, getting below error.

      Java Code Snippet:

      Dataset<Row> rawData = spark.readStream()
                      .format("csv").schema(getSchema())
                      .csv("/rates-streaming/*.csv");
          
              rawData.createOrReplaceTempView("testRates");
              
              rawData.printSchema();
          results.printSchema();
          
          Dataset<Row> results = spark.sql("select rate  from testRates ");
          
              StreamingQuery query = results.writeStream().outputMode("append").format("streaming-jdbc")
                      .outputMode(OutputMode.Append()).option(JDBCOptions.JDBC_URL(), datasource)
                      .option(JDBCOptions.JDBC_TABLE_NAME(), "TABLENAME")
                      .option(JDBCOptions.JDBC_DRIVER_CLASS(), "oracle.jdbc.driver.OracleDriver")
                      .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE(), "100").option("user", username)
                      .option("password", password).option("truncate", false).trigger(Trigger.ProcessingTime("15 seconds")).start();
              query.awaitTermination();
              
              
              
              public StructType getSchema() {
              List<StructField> fields = new ArrayList<>();
              fields.add(DataTypes.createStructField("rate", DataTypes.createDecimalType(16, 9), true));
              StructType schema = DataTypes.createStructType(fields);
              return schema;
      {{        }}}

      Logs:

      java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast to java.math.BigDecimal
          at org.apache.spark.sql.Row.getDecimal(Row.scala:262) ~[spark-catalyst_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.sql.Row.getDecimal$(Row.scala:262) ~[spark-catalyst_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.sql.catalyst.expressions.GenericRow.getDecimal(rows.scala:166) ~[spark-catalyst_2.12-2.4.0.jar:2.4.0]
          at org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12(JdbcUtil.scala:102) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
          at org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12$adapted(JdbcUtil.scala:101) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
          at org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.doWriteAndResetBuffer(JdbcStreamWriter.scala:174) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
          at org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:156) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
          at org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:79) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118) ~[spark-sql_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) ~[spark-core_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116) ~[spark-sql_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67) ~[spark-sql_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.scheduler.Task.run(Task.scala:121) ~[spark-core_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) ~[spark-core_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) ~[spark-core_2.12-2.4.0.jar:2.4.0]
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) ~[spark-core_2.12-2.4.0.jar:2.4.0]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_291]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_291]
          at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]

      Attachments

        Activity

          People

            Unassigned Unassigned
            vijaymk Vijay
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: