Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3703

java.io.IOException: KafkaWriter : failed to send 1 records (since last report)

Details

    Description

      I am trying to read from file and write to Kafka in google cloud kafka and getting following error:

       

      org.apache.beam.sdk.util.UserCodeException: java.io.IOException: KafkaWriter : failed to send 1 records (since last report)
         at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
         at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter$DoFnInvoker.invokeProcessElement(Unknown Source)
         at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
         at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
         at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
         at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120)
         at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
         at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: KafkaWriter : failed to send 1 records (since last report)
         at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.checkForFailures(KafkaIO.java:1639)
         at org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.processElement(KafkaIO.java:1581)
      Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
      

       

      .apply(KafkaIO.<String, String>_write_()
      .withBootstrapServers("ip1:9092,ip2:9092")
      .withTopic("feed")
      .withValueSerializer(StringSerializer.class)
      .withKeySerializer(StringSerializer.class)
              //.updateProducerProperties(ImmutableMap.of("security.protocol","PLAINTEXT"))
              //.updateProducerProperties(ImmutableMap.of("sasl.mechanism","PLAIN"))
      .values() // writes values to Kafka with default key
      

       
      Kafka is running on google cloud bitnami and I am using Flink runner

      How do I pass security information to Kafka IO?

      Attachments

        Activity

          People

            rangadi Raghu Angadi
            jsihota jagdeep sihota
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: