Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.28.0
-
None
Description
Hi Team,
We are getting below error :
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV
Our target is to load file into database. We tried following approach:
@SuppressWarnings("unchecked") public static void main(String[] args) { PCSI02AOptions options = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class); Pipeline p = Pipeline.create(options); PCollection data1 = p.apply("Reading Text", TextIO.read().from(options.getInputFile())) .apply(ParDo.of(new GetRatePlanID())) .apply("Format Result", MapElements.into(TypeDescriptors.strings()) .via((KV<String, Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue())); data1.apply(JdbcIO.<KV<String, Iterable<Integer>>, String>readAll() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration .create("com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/ABC") .withUsername("abc") .withPassword("abc123")) .withCoder(StringUtf8Coder.of()) .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() { @Override public void setParameters(KV<String, Iterable<Integer>> element, PreparedStatement preparedStatement) throws Exception { String[] range = element.getKey().split(","); preparedStatement.setInt(1, Integer.parseInt(range[0])); } }).withQuery("select * from ABC.PAY_PLAN_INFO where plan_key = ?") .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> { ObjectMapper mapper = new ObjectMapper(); ArrayNode arrayNode = mapper.createArrayNode(); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { try { ObjectNode objectNode = mapper.createObjectNode(); objectNode.put("column_name",resultSet.getMetaData().getColumnName(i)); objectNode.put("value",resultSet.getString(i)); arrayNode.add(objectNode); } catch (Exception e) { throw e; } } return mapper.writeValueAsString(arrayNode); }) ) ; State result = p.run().waitUntilFinish(); System.out.println(result); } private static class GetPlanID extends DoFn<String, KV<String, Integer>> { @ProcessElement public void processElement(ProcessContext c) { String[] data = c.element().split(","); Integer plankey = Integer.parseInt(data[0]); String planid = data[1]; c.output(KV.of(planid, plankey)); } }
Error:
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KVException in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at com.loblaw.pcinsiders.jobflow.FiletoDB.main(FiletoDB.java:120) Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV at com.loblaw.pcinsiders.jobflow.FiletoDB$1.setParameters(FiletoDB.java:1) at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:910)
Kindly suggest how we can resolve it ? Or do we have any reference for same if we have kindly share link or snippets.