Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12005

getting issue to load file into database (java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

      Description

      Hi Team,

      We are getting below error :
      org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV
       
      Our target is to load file into database. We tried following approach: 

      @SuppressWarnings("unchecked")
      	public static void main(String[] args) {
      		PCSI02AOptions options = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
      		Pipeline p = Pipeline.create(options);
      
      
      		PCollection data1 = p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
      					.apply(ParDo.of(new GetRatePlanID()))
      					.apply("Format Result", 
      							MapElements.into(TypeDescriptors.strings())
      							.via((KV<String, Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue()));
      
      
      		data1.apply(JdbcIO.<KV<String, Iterable<Integer>>, String>readAll()
      				.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
      						.create("com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/ABC")
      						.withUsername("abc")
      						.withPassword("abc123"))
      				.withCoder(StringUtf8Coder.of())
      				.withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
      					@Override
      					public void setParameters(KV<String, Iterable<Integer>> element,
      							PreparedStatement preparedStatement) throws Exception {
      						String[] range = element.getKey().split(",");
      						preparedStatement.setInt(1, Integer.parseInt(range[0]));
      					}
      
      
      				}).withQuery("select * from ABC.PAY_PLAN_INFO where plan_key = ?")
      				.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
      					ObjectMapper mapper = new ObjectMapper();
      					ArrayNode arrayNode = mapper.createArrayNode();
      					for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
      						try {
      							ObjectNode objectNode = mapper.createObjectNode();
      							objectNode.put("column_name",resultSet.getMetaData().getColumnName(i));
      							objectNode.put("value",resultSet.getString(i));
      							arrayNode.add(objectNode);
      						} catch (Exception e) {
      							throw e;
      						}
      					}
      					return mapper.writeValueAsString(arrayNode);
      				})
      		)
      		;
      
      
      		State result = p.run().waitUntilFinish();
      		System.out.println(result);
      	}
      
      
      private static class GetPlanID extends DoFn<String, KV<String, Integer>> {
      		@ProcessElement
      		public void processElement(ProcessContext c)
      		{
      			String[] data = c.element().split(",");
      			Integer plankey = Integer.parseInt(data[0]);
      			String planid = data[1];
      			c.output(KV.of(planid, plankey));
      		}
      	}

       

      Error:

      Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KVException in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV 
          at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) 
          at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) 
          at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) 
          at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) 
          at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) 
          at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) 
          at com.loblaw.pcinsiders.jobflow.FiletoDB.main(FiletoDB.java:120)
      Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV 
          at com.loblaw.pcinsiders.jobflow.FiletoDB$1.setParameters(FiletoDB.java:1) 
          at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:910)

       
      Kindly suggest how we can resolve it ? Or do we have any reference for same if we have kindly share link or snippets.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              khgaura Gaurav Khandelwal

              Dates

              • Created:
                Updated:

                Issue deployment