Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.10.0
-
None
-
None
Description
TaskConfigJava.java, which parses the task.broadcast.inputs property values, throws an exception if the following regex patterns are not matched:
BROADCAST_STREAM_PATTERN = [^#\\.]
.[^#\\.]#\\d+
BROADCAST_STREAM_RANGE_PATTERN = [^#\\.]\\.[^#\\.]#\\[\\d\\-\\d
]+
Example: system.stream#0, or system.stream#[0-5].
However, if the stream name contains periods, an IllegalArgumentException will be thrown.
Example:
Exception in thread "main" java.lang.IllegalArgumentException: incorrect format in databus.com.linkedin.events.gals.WhitelistedIps#0. Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'
at org.apache.samza.config.TaskConfigJava.getBroadcastSystemStreamPartitions(TaskConfigJava.java:78)
at org.apache.samza.container.grouper.stream.GroupByPartition.<init>(GroupByPartition.java:50)
at org.apache.samza.container.grouper.stream.GroupByPartitionFactory.getSystemStreamPartitionGrouper(GroupByPartitionFactory.java:27)
at org.apache.samza.coordinator.JobCoordinator$.getSystemStreamPartitionGrouper(JobCoordinator.scala:138)
at org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:154)
at org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:107)
at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:91)
at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:97)
at org.apache.samza.job.local.ProcessJobFactory.getJob(ProcessJobFactory.scala:33)
at org.apache.samza.job.JobRunner.run(JobRunner.scala:129)
at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66)
at org.apache.samza.job.JobRunner.main(JobRunner.scala)
Where system=databus and stream=com.linkedin.events.gals.WhitelistedIps
In general, something like system.stream.name#0 will throw an Exception.
Sometimes the stream name is defined by the full namespace, which contains periods to delimit each level. The broadcast stream property thus cannot be used for stream names that follow this naming convention. However, "task.inputs" will allow this because they do not check against a regex pattern, which seems to be an inconsistency in the samza config parsing.
A fix could involve changing the regex pattern from :
BROADCAST_STREAM_PATTERN = "[^#\\.]
.[^#\\.]#\\d+"
to
"[^#\.]\.[^#]#[\d]+"