Details
Description
Here is my Demo code:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); pipeline.apply(JdbcIO.<KV<Integer, String>>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3307/libra_stat") .withUsername("root") .withPassword("123456")) .withQuery("select id, game_id from test_tb") .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())) .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() { public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception { System.out.println(resultSet.getInt(1)); System.out.println(resultSet.getString(2)); return KV.of(resultSet.getInt(1), resultSet.getString(2)); } }) );
I run this demo by direct runner and got NotSerializableException as follow:
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn@68f4865 at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90) at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591) at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435) at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:325) at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:272) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454) at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:165) at com.xiaomi.huyu.processor.demo.SqlDemo.run(SqlDemo.java:30) at com.xiaomi.huyu.processor.demo.SqlDemo.main(SqlDemo.java:21) Caused by: java.io.NotSerializableException: com.xiaomi.huyu.processor.demo.SqlDemo at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) ... 11 more
Any suggestions and comments are welcome, thanks a lot!