Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
2.33.0
-
openjdk version "11.0.10" 2021-01-19
macOS 11.1
Description
Error
I am trying to read a table in a Postgresql database. One of columns has a type definition
Numeric(15, 2)
At runtime I am getting:
Caused by: java.lang.IllegalArgumentException: Expected BigDecimal base to be null or have precision = 15 (was 6), scale = 2 (was 2) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:477) at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:268) at org.apache.beam.sdk.io.jdbc.LogicalTypes$FixedPrecisionNumeric.toInputType(LogicalTypes.java:246) at org.apache.beam.sdk.io.jdbc.SchemaUtil.lambda$createLogicalTypeExtractor$ca0ab2ec$1(SchemaUtil.java:289) at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:354) at org.apache.beam.sdk.io.jdbc.SchemaUtil$BeamRowMapper.mapRow(SchemaUtil.java:332) at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:1172)
It seems that when JdbcIO reads a numeric column it might have smaller precision/scale than defined in DDL.
How to reproduce
import org.apache.beam.sdk.io.jdbc.JdbcIO import org.apache.beam.sdk.testing.PAssert import org.apache.beam.sdk.testing.TestPipeline import org.apache.beam.sdk.transforms.MapElements import org.apache.beam.sdk.transforms.ProcessFunction import org.apache.beam.sdk.values.TypeDescriptors import org.junit.AfterClass import org.junit.BeforeClass import org.junit.Test import org.testcontainers.containers.PostgreSQLContainer import java.math.BigDecimal import java.sql.Connection import java.sql.DriverManager class PostgresNumericBugTest { companion object { val pgContainer = PostgreSQLContainer<Nothing>("postgres:14.0") @BeforeClass @JvmStatic fun start() { pgContainer.start() getConnection().use { val stmt = it.createStatement() stmt.execute( """create table account ( id integer not null primary key, balance numeric(15, 2) ) """.trimIndent() ) stmt.execute("insert into account(id, balance) values(1, 5755.94)") } } @AfterClass @JvmStatic fun stop() { pgContainer.stop() } private fun getConnection(): Connection { return DriverManager.getConnection( pgContainer.jdbcUrl, pgContainer.username, pgContainer.password ) } private fun getDataSourceConfiguration(): JdbcIO.DataSourceConfiguration = JdbcIO.DataSourceConfiguration.create( pgContainer.driverClassName, pgContainer.jdbcUrl ) .withUsername(pgContainer.username) .withPassword(pgContainer.password) } @Test fun readNumeric() { val pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false) val pBalance = pipeline .apply( "Read Account from DB", JdbcIO .readRows() .withQuery("select balance from account") .withDataSourceConfiguration(getDataSourceConfiguration()) ) .apply( "Get Balance", MapElements.into(TypeDescriptors.bigdecimals()).via(ProcessFunction { it.getDecimal(0) }) ) PAssert.that(pBalance).containsInAnyOrder(BigDecimal("5755.94")) pipeline.run() } }
Attachments
Issue Links
- links to