Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-797

"task.broadcast.inputs" does not support stream names with periods in them

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.0
    • 0.10.0
    • None
    • None

    Description

      TaskConfigJava.java, which parses the task.broadcast.inputs property values, throws an exception if the following regex patterns are not matched:

      BROADCAST_STREAM_PATTERN = [^#\\.]
      .[^#\\.]
      #\\d+
      BROADCAST_STREAM_RANGE_PATTERN = [^#\\.]\\.[^#\\.]#\\[\\d\\-\\d
      ]+

      Example: system.stream#0, or system.stream#[0-5].

      However, if the stream name contains periods, an IllegalArgumentException will be thrown.

      Example:

      Exception in thread "main" java.lang.IllegalArgumentException: incorrect format in databus.com.linkedin.events.gals.WhitelistedIps#0. Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'
      at org.apache.samza.config.TaskConfigJava.getBroadcastSystemStreamPartitions(TaskConfigJava.java:78)
      at org.apache.samza.container.grouper.stream.GroupByPartition.<init>(GroupByPartition.java:50)
      at org.apache.samza.container.grouper.stream.GroupByPartitionFactory.getSystemStreamPartitionGrouper(GroupByPartitionFactory.java:27)
      at org.apache.samza.coordinator.JobCoordinator$.getSystemStreamPartitionGrouper(JobCoordinator.scala:138)
      at org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:154)
      at org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:107)
      at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:91)
      at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:97)
      at org.apache.samza.job.local.ProcessJobFactory.getJob(ProcessJobFactory.scala:33)
      at org.apache.samza.job.JobRunner.run(JobRunner.scala:129)
      at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66)
      at org.apache.samza.job.JobRunner.main(JobRunner.scala)

      Where system=databus and stream=com.linkedin.events.gals.WhitelistedIps

      In general, something like system.stream.name#0 will throw an Exception.

      Sometimes the stream name is defined by the full namespace, which contains periods to delimit each level. The broadcast stream property thus cannot be used for stream names that follow this naming convention. However, "task.inputs" will allow this because they do not check against a regex pattern, which seems to be an inconsistency in the samza config parsing.

      A fix could involve changing the regex pattern from :
      BROADCAST_STREAM_PATTERN = "[^#\\.]
      .[^#\\.]
      #\\d+"

      to

      "[^#\.]\.[^#]#[\d]+"

      Attachments

        1. SAMZA-797-0.patch
          6 kB
          Navina Ramesh

        Activity

          People

            navina Navina Ramesh
            thomas_chow Thomas Chow
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: