Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-797

"task.broadcast.inputs" does not support stream names with periods in them

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.0
    • Fix Version/s: 0.10.0
    • Component/s: None
    • Labels:
      None

      Description

      TaskConfigJava.java, which parses the task.broadcast.inputs property values, throws an exception if the following regex patterns are not matched:

      BROADCAST_STREAM_PATTERN = [^#\\.]
      .[^#\\.]
      #\\d+
      BROADCAST_STREAM_RANGE_PATTERN = [^#\\.]\\.[^#\\.]#\\[\\d\\-\\d
      ]+

      Example: system.stream#0, or system.stream#[0-5].

      However, if the stream name contains periods, an IllegalArgumentException will be thrown.

      Example:

      Exception in thread "main" java.lang.IllegalArgumentException: incorrect format in databus.com.linkedin.events.gals.WhitelistedIps#0. Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'
      at org.apache.samza.config.TaskConfigJava.getBroadcastSystemStreamPartitions(TaskConfigJava.java:78)
      at org.apache.samza.container.grouper.stream.GroupByPartition.<init>(GroupByPartition.java:50)
      at org.apache.samza.container.grouper.stream.GroupByPartitionFactory.getSystemStreamPartitionGrouper(GroupByPartitionFactory.java:27)
      at org.apache.samza.coordinator.JobCoordinator$.getSystemStreamPartitionGrouper(JobCoordinator.scala:138)
      at org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:154)
      at org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:107)
      at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:91)
      at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:97)
      at org.apache.samza.job.local.ProcessJobFactory.getJob(ProcessJobFactory.scala:33)
      at org.apache.samza.job.JobRunner.run(JobRunner.scala:129)
      at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66)
      at org.apache.samza.job.JobRunner.main(JobRunner.scala)

      Where system=databus and stream=com.linkedin.events.gals.WhitelistedIps

      In general, something like system.stream.name#0 will throw an Exception.

      Sometimes the stream name is defined by the full namespace, which contains periods to delimit each level. The broadcast stream property thus cannot be used for stream names that follow this naming convention. However, "task.inputs" will allow this because they do not check against a regex pattern, which seems to be an inconsistency in the samza config parsing.

      A fix could involve changing the regex pattern from :
      BROADCAST_STREAM_PATTERN = "[^#\\.]
      .[^#\\.]
      #\\d+"

      to

      "[^#\.]\.[^#]#[\d]+"

        Attachments

        1. SAMZA-797-0.patch
          6 kB
          Navina Ramesh

          Activity

            People

            • Assignee:
              navina Navina Ramesh
              Reporter:
              thomas_chow Thomas Chow
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: