Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-2071

Skip StartpointManager instantiation for PassThroughCoordinator.

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1
    • Fix Version/s: 1.1
    • Component/s: None
    • Labels:
      None

      Description

      Samza support PassThroughCoordinator where the coordination between processors and checkpoints are expected to be managed by the external service. Coordinator stream is used in samza to persist the metadata related to the coordiantion and a execution. JobCoordinator is not applicable for the PassthroughCoordinator case in samza.

      SAMZA-1985 added a startpoint manager implementation and by default used coordinator stream as the startpoint metadata store. After this, running PassthroughCoordinator locally require coordinator stream and the associated coordinator configurations. If it is not defined, it spams the log with the following exception.

      org.apache.samza.config.ConfigException: Missing job.coordinator.system configuration. Cannot proceed with job execution. at org.apache.samza.config.JobConfig.getCoordinatorSystemName(JobConfig.scala:145)
      at org.apache.samza.util.CoordinatorStreamUtil$.getCoordinatorSystemStream(CoordinatorStreamUtil.scala:56)
      at org.apache.samza.util.CoordinatorStreamUtil.getCoordinatorSystemStream(CoordinatorStreamUtil.scala)
      at org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.<init>(CoordinatorStreamStore.java:81)
      at org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory.getMetadataStore(CoordinatorStreamMetadataStoreFactory.java:34)
      at org.apache.samza.startpoint.StartpointManager.<init>(StartpointManager.java:77)
      at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:428)
      at org.apache.samza.container.SamzaContainer.apply(SamzaContainer.scala)
      at org.apache.samza.processor.StreamProcessor.createSamzaContainer(StreamProcessor.java:333)
      at org.apache.samza.processor.StreamProcessor$1.onNewJobModel(StreamProcessor.java:405)
      at org.apache.samza.standalone.PassthroughJobCoordinator.start(PassthroughJobCoordinator.java:103)
      at org.apache.samza.processor.StreamProcessor.start(StreamProcessor.java:266)
      at java.util.concurrent.ConcurrentHashMap$KeySetView.forEach(ConcurrentHashMap.java:4649)
      at org.apache.samza.runtime.LocalApplicationRunner.run(LocalApplicationRunner.java:130)
      at org.apache.samza.runtime.ApplicationRunner.run(ApplicationRunner.java:40)
      at org.apache.beam.runners.samza.SamzaRunner.run(SamzaRunner.java:110)
      at org.apache.beam.runners.samza.TestSamzaRunner.run(TestSamzaRunner.java:75)
      at org.apache.beam.runners.samza.runtime.SamzaStoreStateInternalsTest.testMapStateIterator(SamzaStoreStateInternalsTest.java:132)
      

      This hampers local mode development and testing of samza-beam jobs.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                spvenkat Shanthoosh Venkataraman
                Reporter:
                spvenkat Shanthoosh Venkataraman
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h