Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13242

IllegalArgumentException while reading a Numeric column with a fixed precision

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.33.0
    • 2.35.0
    • io-java-jdbc
    • openjdk version "11.0.10" 2021-01-19
      macOS 11.1

    Description

      Error

       

      I am trying to read a table in a Postgresql database. One of columns has a type definition 

      Numeric(15, 2)

       

      At runtime I am getting:

      Caused by: java.lang.IllegalArgumentException: Expected BigDecimal base to be null or have precision = 15 (was 6), scale = 2 (was 2)	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:477)	at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:268)	at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:246)	at org.apache.beam.sdk.io.jdbc.SchemaUtil.lambda$createLogicalTypeExtractor$ca0ab2ec$1(SchemaUtil.java:289)	at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:354)	at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:332)	at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:1172) 

       

      It seems that when JdbcIO reads a numeric column it might have smaller precision/scale than defined in DDL.

       

      How to reproduce

       

      import org.apache.beam.sdk.io.jdbc.JdbcIO
      import org.apache.beam.sdk.testing.PAssert
      import org.apache.beam.sdk.testing.TestPipeline
      import org.apache.beam.sdk.transforms.MapElements
      import org.apache.beam.sdk.transforms.ProcessFunction
      import org.apache.beam.sdk.values.TypeDescriptors
      import org.junit.AfterClass
      import org.junit.BeforeClass
      import org.junit.Test
      import org.testcontainers.containers.PostgreSQLContainer
      import java.math.BigDecimal
      import java.sql.Connection
      import java.sql.DriverManager
      
      class PostgresNumericBugTest {
          companion object {
              val pgContainer = PostgreSQLContainer<Nothing>("postgres:14.0")
      
              @BeforeClass
              @JvmStatic
              fun start() {
                  pgContainer.start()
      
                  getConnection().use {
                      val stmt = it.createStatement()
                      stmt.execute(
                          """create table account (
                                id integer not null primary key,
                                balance numeric(15, 2)
                              )
                          """.trimIndent()
                      )
                      stmt.execute("insert into account(id, balance) values(1, 5755.94)")
                  }
              }
      
              @AfterClass
              @JvmStatic
              fun stop() {
                  pgContainer.stop()
              }
      
              private fun getConnection(): Connection {
                  return DriverManager.getConnection(
                      pgContainer.jdbcUrl, pgContainer.username, pgContainer.password
                  )
              }
      
              private fun getDataSourceConfiguration(): JdbcIO.DataSourceConfiguration =
                  JdbcIO.DataSourceConfiguration.create(
                      pgContainer.driverClassName,
                      pgContainer.jdbcUrl
                  )
                      .withUsername(pgContainer.username)
                      .withPassword(pgContainer.password)
          }
      
          @Test
          fun readNumeric() {
              val pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false)
      
              val pBalance = pipeline
                  .apply(
                      "Read Account from DB",
                      JdbcIO
                          .readRows()
                          .withQuery("select balance from account")
                          .withDataSourceConfiguration(getDataSourceConfiguration())
                  )
                  .apply(
                      "Get Balance",
                      MapElements.into(TypeDescriptors.bigdecimals()).via(ProcessFunction { it.getDecimal(0) })
                  )
      
              PAssert.that(pBalance).containsInAnyOrder(BigDecimal("5755.94"))
      
              pipeline.run()
          }
      }
       

      Attachments

        Issue Links

          Activity

            People

              Vitaly Ivanov Vitaly Ivanov
              Borovikov Denis
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 4h 10m
                  4h 10m