Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12494

Dataflow Kafka Job not triggering for external subnet

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Won't Fix
    • 2.28.0
    • Not applicable
    • io-java-kafka
    • None
    • IntelliJ community version, Maven, Windows, Dataflow version 2.28.0
    • Patch, Important

    Description

      Hello,

      Our team is facing an issue in streaming the Dataflow Kafka job through IntelliJ that is hosted on a private subnet. 

      The hypothesis is that during Graph Construction time [0], the beam locally tries to execute the code and check all the connections. In our case, we don't have access to subnet through IntelliJ or through the Cloud console. We do have access when compute engine instance is created within that subnet.

      We reached out to Google support and they suggested us to raise a defect with u. The following code throws time-out error when we execute through IntelliJ.

      pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
              .withConsumerConfigUpdates(propertyBuilder)
              .withConsumerConfigUpdates(
                      ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group")
              )
              .withBootstrapServers(options.getBootstrapServers())
              .withTopics(topicsList)
              .withKeyDeserializer(StringDeserializer.class)
              .withValueDeserializer(StringDeserializer.class)
              .commitOffsetsInFinalize()
             // .withMaxNumRecords(5)
      )
      

      But, if we uncomment

      .withMaxNumRecords()

      The code works perfectly and we are able to spin up dataflow job in the desired subnet to ingest the Kafka stream.

      pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
              .withConsumerConfigUpdates(propertyBuilder)
              .withConsumerConfigUpdates(
                      ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group")
              )
              .withBootstrapServers(options.getBootstrapServers())
              .withTopics(topicsList)
              .withKeyDeserializer(StringDeserializer.class)
              .withValueDeserializer(StringDeserializer.class)
              .commitOffsetsInFinalize()
              .withMaxNumRecords(5)
      )
      

      The issue with the above Code is that the Dataflow will stop after ingesting the given number of records and will act like Batch ingestion, instead of Streaming, which we don't want.

      Google support team hypothesis:

      Current hypothesis is that the issue is happening in `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic information.

      The first point is, `withMaxNumRecords` is used for testing [2] and when specified, the unbounded nature of the pipeline is converted into bounded read in `BoundedReadFromUnboundedSource` [3] but without the `withMaxNumRecords` the pipeline is still unbounded.

      When the pipeline is Bounded (when mentioning withMaxNumRecords) the `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on Dataflow, it did not have issue connecting to Kafka.

      But, when the pipeline is Unbounded (withMaxNumRecords commented out) the `split()` is called when the pipeline is built locally at graph construction phase [5][6] at which point it does not have access to Kafka.

       

      [0]
       https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job
       [1]
      https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57
      [2] https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-
      [3] https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193
      [4] https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169
      [5] https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87
      [6] https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job
       

      Attachments

        1. TimeOutLogs_KafkaIngestion.txt
          29 kB
          Jasminder pal singh sehgal
        2. SuccessfulJobRun-KafkaIngestion.txt
          27 kB
          Jasminder pal singh sehgal
        3. LogsStreamingEngine.txt
          28 kB
          Jasminder pal singh sehgal
        4. image-2021-06-21-15-00-09-851.png
          123 kB
          Jasminder pal singh sehgal
        5. image-2021-06-20-22-23-14-052.png
          109 kB
          Jasminder pal singh sehgal
        6. image-2021-06-20-22-20-24-363.png
          90 kB
          Jasminder pal singh sehgal
        7. image-2021-06-16-16-55-57-509.png
          34 kB
          Jasminder pal singh sehgal
        8. image-2021-06-16-16-54-25-161.png
          29 kB
          Jasminder pal singh sehgal
        9. CodeSnippet.JPG
          47 kB
          Jasminder pal singh sehgal

        Activity

          People

            Unassigned Unassigned
            sehgaljps Jasminder pal singh sehgal
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: