Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13129

[Streaming][PubSub Lite][DataflowRunner] PubSub Lite IO doesn't sink message to PubSub Lite topic

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.30.0, 2.31.0, 2.32.0, 2.33.0
    • None
    • extensions-java-gcp
    • None
    • GCP Dataflow

    Description

      We are currently using PubSub Lite IO with Dataflow Runner.

      Our Beam job is on streaming mode.

      The read from a PubSub Lite subscription works correctly.

      The sink to a PubSub topic doesn't work with the runner. 

      When we take a look  on the job graph for the PubSubLite Write step (transform : org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteSink ) in the Google Cloud Console we don't see any writes.

      When we check the topic we don't see any outputs.

      Code works well on 2.27, 2.28, 2.29 Beam version.

      Here is the code we used to do the check on version:

      • 2.27 
      • 2.28
      • 2.29
      • 2.30
      • 2.31
      • 2.33
      • 2.33

       

      // PubSubStreamingWriteJobOptions options =
              PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubStreamingWriteJobOptions.class);
      
      options.setStreaming(true);
      
      //set up file system
      FileSystems.setDefaultPipelineOptions(options);
      
      
      
      
      TopicPath topicPath = TopicPath.newBuilder()
              .setProject(ProjectId.of("[PROJECT ID]"))
              .setLocation(CloudZone.of(CloudRegion.of("[REGION]"), "[ZONE CHAR]"))
              .setName(TopicName.of("[TOPIC ID]"))
              .build();
      
      
      PublisherOptions publisherOptions =
              PublisherOptions.newBuilder()
                      .setTopicPath(topicPath)
                      .build();
      
      Pipeline pipeline = Pipeline.create(options);
      
      pipeline.apply(TextIO.read()
                      .from("gs://[BUCKET]/[OBJECT_PREFIX]*")
                      .watchForNewFiles(
                              Duration.standardMinutes(1),
                              Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))))
              .apply(CREATE_PUB_SUB_LITE_MESSAGE_STEP, MapElements.into(TypeDescriptor.of(PubSubMessage.class)).via(file -> {
                  Instant instant = Instant.now();
                  Message message =
                          Message.builder()
                                  .setData(ByteString.copyFromUtf8("message " + file))
                                  .setEventTime(Timestamp.newBuilder()
                                          .setNanos(instant.getNano())
                                          .setSeconds(instant.getEpochSecond())
                                          .build())
                                  .build();
      
                  return message.toProto();
              }))
              .apply(SINK_PUB_SUB_LITE_MESSAGES_STEP, PubsubLiteIO.write(publisherOptions));
      pipeline.run();
      

      Can you help us to found the issue and fix the Beam version please?

       

      Best regards,

      David Duarte

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            dduarte David Duarte
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: