Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11635

Unable to subscribe to several PubSub subscriptions with a valueprovider

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Triage Needed
    • Priority: P2
    • Resolution: Unresolved
    • Affects Version/s: 2.27.0
    • Fix Version/s: None
    • Labels:
      None
    • Environment:
      GCP

      Description

      Deploying as a template a streaming job in Google Dataflow, I can't create more than one PubSub subscriptions, when subscription name is determined at runtime (with a ValueProvider):

            final List<PCollection<String>> pCollections = new ArrayList<>();
              for (final String topic : topics) {
                  final ValueProvider<String> vpSub = ValueProvider.NestedValueProvider.of(options.getSubscriptionPrefix(),
                          prefix -> prefix + topic);
                  PCollection<String> messages =
                          pipeline.apply("Sub-read-" + topic, PubsubIO.readStrings().fromSubscription(vpSub))
                                  .apply("Windowing", Window.into(duration));
                  pCollections.add(messages);
              }
      

      During the launching of the job, I got the error in the logs:

      Workflow failed. Causes: The pubsub configuration contains errors: Subscription 'projects/_project_/subscriptions/_prefix_' is consumed by multiple stages, this will result in undefined behavior.
      

      It seems that the coded SerializableFunction that I provide to the instances of NestedValueProvider are not called.

      It could be due to https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java  StreamingPubsubIOReadTranslator#translate, especially these lines :

            if (overriddenTransform.getSubscriptionProvider() != null) {
              if (overriddenTransform.getSubscriptionProvider().isAccessible()) {
                stepContext.addInput(
                    PropertyNames.PUBSUB_SUBSCRIPTION,
                    overriddenTransform.getSubscription().getV1Beta1Path());
              } else {
                stepContext.addInput(
                    PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE,
                    ((NestedValueProvider) overriddenTransform.getSubscriptionProvider()).propertyName());
              }
            }
      

      Indeed this code :

      • Forces to use a NestedValueProvider if value is not accessible before runtime, due to the vehement cast,
      • Results in having the same incorrect subscription value at runtime (so only the prefix in my case) for all the reads of the loop, at least at validation time

       

      Seems to be the same issue reported there : https://issuetracker.google.com/u/0/issues/157584222

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              Jean-Gabriel Limbourg
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: