Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: hello-samza
    • Labels:

      Description

      It would be nice if Samza had support for Mesos (https://mesos.apache.org/).

      The current plan is to create a MesosJob and MesosJobFactory, then look into what it would take to allow the AM code to act as a Mesos scheduler.

      The feasibility of this landing in trunk will be better understood after a rough prototype has been created.

      1. Banno-samza-mesos-1.3.0-1-g62c624d.tar.gz
        25 kB
        Zach Cox
      2. SAMZA-375.patch
        7.16 MB
        Jon Bringhurst
      3. samza-mesos-multiple-jobs.jpg
        1.67 MB
        Jon Bringhurst
      4. Screen Shot 2014-09-22 at 8.59.12 AM.png
        107 kB
        Jon Bringhurst
      5. Screen Shot 2014-08-23 at 5.51.39 PM.png
        156 kB
        Jon Bringhurst

        Issue Links

          Activity

          Hide
          jonbringhurst Jon Bringhurst added a comment - - edited

          A modified hello-samza with support for creating a basic Mesos grid (no samza integration yet) can be found at https://github.com/fintler/hello-samza/tree/SAMZA-375

          To bootstrap the grid, use:

          cd hello-samza
          ./bin/grid bootstrap-mesos
          

          After it finishes, head to http://localhost:5050/ for the Mesos console.

          I tried running this on jdk8u11 at first, but it complains a ton and fails out on javadoc errors. It worked after switching to jdk7u51.

          Also, the mesos build is painfully long. There appears to be a bug in the autotools setup that prevents parallel builds (-j#) from working properly.

          Show
          jonbringhurst Jon Bringhurst added a comment - - edited A modified hello-samza with support for creating a basic Mesos grid (no samza integration yet) can be found at https://github.com/fintler/hello-samza/tree/SAMZA-375 To bootstrap the grid, use: cd hello-samza ./bin/grid bootstrap-mesos After it finishes, head to http://localhost:5050/ for the Mesos console. I tried running this on jdk8u11 at first, but it complains a ton and fails out on javadoc errors. It worked after switching to jdk7u51. Also, the mesos build is painfully long. There appears to be a bug in the autotools setup that prevents parallel builds (-j#) from working properly.
          Hide
          jonbringhurst Jon Bringhurst added a comment -

          I'm keeping the modifications to Samza itself at:

          https://github.com/fintler/samza/tree/SAMZA-375/samza-mesos/src/main/scala/org/apache/samza/job/mesos

          Please pardon my dust and terrible understanding of Scala.

          Show
          jonbringhurst Jon Bringhurst added a comment - I'm keeping the modifications to Samza itself at: https://github.com/fintler/samza/tree/SAMZA-375/samza-mesos/src/main/scala/org/apache/samza/job/mesos Please pardon my dust and terrible understanding of Scala.
          Hide
          jonbringhurst Jon Bringhurst added a comment - - edited

          The basic design for this is in place (but not completely implemented yet).

          The current state (how many tasks are running/allocated/etc) will be stored in zookeeper (not implemented yet) from:

          https://github.com/fintler/samza/blob/SAMZA-375/samza-mesos/src/main/scala/org/apache/samza/job/mesos/SamzaSchedulerState.scala

          As mesos offers resources (resourceOffers cb), the SamzaSchedulerState is modified by:

          https://github.com/fintler/samza/blob/SAMZA-375/samza-mesos/src/main/scala/org/apache/samza/job/mesos/SamzaScheduler.scala

          By keeping the state in zookeeper, the scheduler should be able to fail and reregister() without requiring the samza tasks to restart.

          Any thoughts on the overall design?

          Show
          jonbringhurst Jon Bringhurst added a comment - - edited The basic design for this is in place (but not completely implemented yet). The current state (how many tasks are running/allocated/etc) will be stored in zookeeper (not implemented yet) from: https://github.com/fintler/samza/blob/SAMZA-375/samza-mesos/src/main/scala/org/apache/samza/job/mesos/SamzaSchedulerState.scala As mesos offers resources (resourceOffers cb), the SamzaSchedulerState is modified by: https://github.com/fintler/samza/blob/SAMZA-375/samza-mesos/src/main/scala/org/apache/samza/job/mesos/SamzaScheduler.scala By keeping the state in zookeeper, the scheduler should be able to fail and reregister() without requiring the samza tasks to restart. Any thoughts on the overall design?
          Hide
          criccomini Chris Riccomini added a comment -

          By keeping the state in zookeeper, the scheduler should be able to fail and reregister() without requiring the samza tasks to restart.

          Do we ever run into a race condition where the Mesos container has been allocated and started, but the ZK state has not yet been saved?

          Any thoughts on the overall design?

          Longer term, we're going to be providing a state-management facility called the ConfigLog (SAMZA-348). It might be possible to store this state in the ConfigLog, rather than ZooKeeper. Thus far, we've been able to avoid all direct dependencies on ZooKeeper in Samza (we still use it transitively through Kafka, though that is going away too). Not something that you have to worry about now, but we should consider this use case as part of SAMZA-348.

          Also, we should extract out the shared logic between the YARN AM and the SamzaScheduler, but this isn't something you should worry about right now. We can open a separate ticket for it.

          Naive question: does Mesos provide any state facility itself? If the scheduler fails and is restarted, will it tell you what was running? If so, does it give you any metadata? Perhaps we could just store enough information in the metadata about the already-existing containers.

          Another naive comment: In YARN, right now, we just nuke all containers when a AM restarts. We then start from scratch and request new resources. This could be a simpler (but less efficient) alternative strategy to the ZK-based scheduler state.

          Show
          criccomini Chris Riccomini added a comment - By keeping the state in zookeeper, the scheduler should be able to fail and reregister() without requiring the samza tasks to restart. Do we ever run into a race condition where the Mesos container has been allocated and started, but the ZK state has not yet been saved? Any thoughts on the overall design? Longer term, we're going to be providing a state-management facility called the ConfigLog ( SAMZA-348 ). It might be possible to store this state in the ConfigLog, rather than ZooKeeper. Thus far, we've been able to avoid all direct dependencies on ZooKeeper in Samza (we still use it transitively through Kafka, though that is going away too). Not something that you have to worry about now, but we should consider this use case as part of SAMZA-348 . Also, we should extract out the shared logic between the YARN AM and the SamzaScheduler, but this isn't something you should worry about right now. We can open a separate ticket for it. Naive question: does Mesos provide any state facility itself? If the scheduler fails and is restarted, will it tell you what was running? If so, does it give you any metadata? Perhaps we could just store enough information in the metadata about the already-existing containers. Another naive comment: In YARN, right now, we just nuke all containers when a AM restarts. We then start from scratch and request new resources. This could be a simpler (but less efficient) alternative strategy to the ZK-based scheduler state.
          Hide
          jonbringhurst Jon Bringhurst added a comment -

          Do we ever run into a race condition where the Mesos container has been allocated and started, but the ZK state has not yet been saved?

          Right now I'm just following the pattern that I see in most Mesos examples (which appears to have a race condition). Once things are working, it shouldn't be much effort to do a prepare/committed and have the reregister handle the state in between the two.

          Naive question: does Mesos provide any state facility itself? If the scheduler fails and is restarted, will it tell you what was running? If so, does it give you any metadata? Perhaps we could just store enough information in the metadata about the already-existing containers.

          There's a few. When I say "stored in zookeeper", I really mean Mesos' abstraction for ZK (org.apache.mesos.state.ZooKeeperState):

          https://github.com/apache/mesos/tree/master/src/java/src/org/apache/mesos/state

          I'll dig into SAMZA-348. It seems like an implementation of org.apache.mesos.state.State that uses ConfigLog might be a possibility.

          Show
          jonbringhurst Jon Bringhurst added a comment - Do we ever run into a race condition where the Mesos container has been allocated and started, but the ZK state has not yet been saved? Right now I'm just following the pattern that I see in most Mesos examples (which appears to have a race condition). Once things are working, it shouldn't be much effort to do a prepare/committed and have the reregister handle the state in between the two. Naive question: does Mesos provide any state facility itself? If the scheduler fails and is restarted, will it tell you what was running? If so, does it give you any metadata? Perhaps we could just store enough information in the metadata about the already-existing containers. There's a few. When I say "stored in zookeeper", I really mean Mesos' abstraction for ZK (org.apache.mesos.state.ZooKeeperState): https://github.com/apache/mesos/tree/master/src/java/src/org/apache/mesos/state I'll dig into SAMZA-348 . It seems like an implementation of org.apache.mesos.state.State that uses ConfigLog might be a possibility.
          Hide
          jonbringhurst Jon Bringhurst added a comment - - edited

          I've attached a teaser screenshot of hello-samza's wikipedia-feed running in Mesos. There's still a ton of work to be done before this should be considered for trunk, but at least Samza jobs are able to run in Mesos now.

          Show
          jonbringhurst Jon Bringhurst added a comment - - edited I've attached a teaser screenshot of hello-samza's wikipedia-feed running in Mesos. There's still a ton of work to be done before this should be considered for trunk, but at least Samza jobs are able to run in Mesos now.
          Hide
          jonbringhurst Jon Bringhurst added a comment -

          Just to clarify the state of this, the job is running, but the tasks are not. Once the env vars are setup properly, it seems like it might work:

          2014-08-24 11:20:57 JobRunner [INFO] job factory: org.apache.samza.job.mesos.MesosJobFactory
          2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5
          2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@716: Client environment:host.name=localhost.localdomain
          2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@723: Client environment:os.name=Linux
          2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@724: Client environment:os.arch=3.10.0-123.6.3.el7.x86_64
          2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@725: Client environment:os.version=#1 SMP Wed Aug 6 21:12:36 UTC 2014
          2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@733: Client environment:user.name=jbringhu
          2014-08-24 11:20:57,217:14126(0x7f52c940d700):ZOO_INFO@log_env@741: Client environment:user.home=/home/jbringhu
          2014-08-24 11:20:57,217:14126(0x7f52c940d700):ZOO_INFO@log_env@753: Client environment:user.dir=/home/jbringhu/samza-dev/hello-samza
          2014-08-24 11:20:57,217:14126(0x7f52c940d700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=127.0.0.1:2181 sessionTimeout=10000 watcher=0x7f52cefa2880 sessionId=0 sessionPasswd=<null> context=0x7f52a8001070 flags=0
          2014-08-24 11:20:57,231:14126(0x7f52c7c0a700):ZOO_INFO@check_events@1703: initiated connection to server [127.0.0.1:2181]
          2014-08-24 11:20:57 SamzaScheduler [INFO] No mesos.executor.count specified. Defaulting to one container.
          2014-08-24 11:20:57,251:14126(0x7f52c7c0a700):ZOO_INFO@check_events@1750: session establishment complete on server [127.0.0.1:2181], sessionId=0x148089b0c550005, negotiated timeout=10000
          2014-08-24 11:20:57 VerifiableProperties [INFO] Verifying properties
          2014-08-24 11:20:57 VerifiableProperties [INFO] Property batch.num.messages is overridden to 1
          2014-08-24 11:20:57 VerifiableProperties [INFO] Property client.id is overridden to samza_admin-wikipedia_feed-1-1408893657274-0
          2014-08-24 11:20:57 VerifiableProperties [INFO] Property metadata.broker.list is overridden to localhost:9092
          2014-08-24 11:20:57 VerifiableProperties [INFO] Property producer.type is overridden to sync
          2014-08-24 11:20:57 VerifiableProperties [INFO] Verifying properties
          2014-08-24 11:20:57 VerifiableProperties [INFO] Property client.id is overridden to samza_admin-wikipedia_feed-1-1408893657274-0
          2014-08-24 11:20:57 VerifiableProperties [INFO] Property group.id is overridden to undefined-samza-consumer-group-8ef2ae53-c3ac-42e9-99d9-1d849d59b6a6
          2014-08-24 11:20:57 VerifiableProperties [INFO] Property zookeeper.connect is overridden to localhost:2181/
          2014-08-24 11:20:57 Util$ [INFO] Instantiating type org.apache.samza.container.grouper.stream.GroupByPartitionFactory to build SystemStreamPartition groupings
          2014-08-24 11:20:57 Util$ [INFO] SystemStreamPartitionGrouper org.apache.samza.container.grouper.stream.GroupByPartition@e1abbbd has grouped the SystemStreamPartitions into the following taskNames:
          2014-08-24 11:20:57 Util$ [INFO] TaskName: Partition 0 => [SystemStreamPartition [wikipedia, #en.wiktionary, 0], SystemStreamPartition [wikipedia, #en.wikinews, 0], SystemStreamPartition [wikipedia, #en.wikipedia, 0]]
          2014-08-24 11:20:57 Util$ [INFO] Assigning 1 SystemStreamPartitions taskNames to 1 containers.
          2014-08-24 11:20:57 Util$ [INFO] Grouped SystemStreamPartition TaskNames (size = 1): 
          2014-08-24 11:20:57 Util$ [INFO] Container number: 0 => Map(Partition 0 -> Set(SystemStreamPartition [wikipedia, #en.wiktionary, 0], SystemStreamPartition [wikipedia, #en.wikinews, 0], SystemStreamPartition [wikipedia, #en.wikipedia, 0]))
          2014-08-24 11:20:57 Util$ [INFO] Previous mapping of taskNames to partition: List()
          2014-08-24 11:20:57 Util$ [INFO] Current set of taskNames: List(Partition 0)
          2014-08-24 11:20:57 Util$ [WARN] No previous taskName mapping defined.  This is OK if it's the first time the job is being run, otherwise data may have been lost.
          2014-08-24 11:20:57 Util$ [INFO] No taskNames are missing between this run and previous
          2014-08-24 11:20:57 Util$ [WARN] The following new taskNames have been added in this job run: Set(Partition 0)
          2014-08-24 11:20:57 Util$ [INFO] New taskName to partition mapping: List((Partition 0,0))
          2014-08-24 11:20:57 SamzaScheduler [INFO] Awaiting offers for 1 executors
          I0824 11:20:57.433486 14158 sched.cpp:126] Version: 0.19.1
          2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5
          2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@716: Client environment:host.name=localhost.localdomain
          2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@723: Client environment:os.name=Linux
          2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@724: Client environment:os.arch=3.10.0-123.6.3.el7.x86_64
          2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@725: Client environment:os.version=#1 SMP Wed Aug 6 21:12:36 UTC 2014
          2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@733: Client environment:user.name=jbringhu
          2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@741: Client environment:user.home=/home/jbringhu
          2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@753: Client environment:user.dir=/home/jbringhu/samza-dev/hello-samza
          2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=localhost:2181 sessionTimeout=10000 watcher=0x7f52cefa2880 sessionId=0 sessionPasswd=<null> context=0x7f52d4001040 flags=0
          2014-08-24 11:20:57,436:14126(0x7f52c69ff700):ZOO_INFO@check_events@1703: initiated connection to server [127.0.0.1:2181]
          2014-08-24 11:20:57,440:14126(0x7f52c69ff700):ZOO_INFO@check_events@1750: session establishment complete on server [127.0.0.1:2181], sessionId=0x148089b0c550006, negotiated timeout=10000
          I0824 11:20:57.441314 14173 group.cpp:310] Group process ((5)@127.0.0.1:45052) connected to ZooKeeper
          I0824 11:20:57.441344 14173 group.cpp:784] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0)
          I0824 11:20:57.441359 14173 group.cpp:382] Trying to create path '/mesos' in ZooKeeper
          I0824 11:20:57.442654 14173 detector.cpp:135] Detected a new leader: (id='3')
          I0824 11:20:57.442818 14173 group.cpp:655] Trying to get '/mesos/info_0000000003' in ZooKeeper
          I0824 11:20:57.443436 14173 detector.cpp:377] A new leading master (UPID=master@127.0.0.1:5050) is detected
          I0824 11:20:57.443491 14173 sched.cpp:222] New master detected at master@127.0.0.1:5050
          I0824 11:20:57.443764 14173 sched.cpp:230] No credentials provided. Attempting to register without authentication
          I0824 11:20:57.447118 14173 sched.cpp:397] Framework registered with wikipedia-feed
          2014-08-24 11:20:57 SamzaScheduler [INFO] Framework registered
          2014-08-24 11:20:57 SamzaScheduler [INFO] Received offer id {
            value: "20140824-112052-16777343-5050-14035-0"
          }
          framework_id {
            value: "wikipedia-feed"
          }
          slave_id {
            value: "20140824-033930-16777343-5050-12649-0"
          }
          hostname: "localhost"
          resources {
            name: "cpus"
            type: SCALAR
            scalar {
              value: 2.0
            }
            role: "*"
          }
          resources {
            name: "mem"
            type: SCALAR
            scalar {
              value: 2761.0
            }
            role: "*"
          }
          resources {
            name: "disk"
            type: SCALAR
            scalar {
              value: 12798.0
            }
            role: "*"
          }
          resources {
            name: "ports"
            type: RANGES
            ranges {
              range {
                begin: 31000
                end: 32000
              }
            }
            role: "*"
          }
          
          2014-08-24 11:20:57 SamzaScheduler [INFO] Got available task id (0) for offer: id {
            value: "20140824-112052-16777343-5050-14035-0"
          }
          framework_id {
            value: "wikipedia-feed"
          }
          slave_id {
            value: "20140824-033930-16777343-5050-12649-0"
          }
          hostname: "localhost"
          resources {
            name: "cpus"
            type: SCALAR
            scalar {
              value: 2.0
            }
            role: "*"
          }
          resources {
            name: "mem"
            type: SCALAR
            scalar {
              value: 2761.0
            }
            role: "*"
          }
          resources {
            name: "disk"
            type: SCALAR
            scalar {
              value: 12798.0
            }
            role: "*"
          }
          resources {
            name: "ports"
            type: RANGES
            ranges {
              range {
                begin: 31000
                end: 32000
              }
            }
            role: "*"
          }
          
          2014-08-24 11:20:57 SamzaScheduler [INFO] Claimed SSP taskNames Map(Partition 0 -> Set(SystemStreamPartition [wikipedia, #en.wiktionary, 0], SystemStreamPartition [wikipedia, #en.wikinews, 0], SystemStreamPartition [wikipedia, #en.wikipedia, 0])) for offer ID 0
          2014-08-24 11:20:57 SamzaScheduler [INFO] Task ID 0 using command bin/run-container.sh
          2014-08-24 11:20:57 SamzaScheduler [INFO] Launching task 0
          2014-08-24 11:20:57 SamzaScheduler [INFO] Started task ID 0
          2014-08-24 11:20:58 SamzaScheduler [INFO] (Status Update for Task %s: %s,value: "0"
          ,TASK_RUNNING)
          2014-08-24 11:20:59 SamzaScheduler [INFO] (Status Update for Task %s: %s,value: "0"
          ,TASK_FAILED)
          
          Show
          jonbringhurst Jon Bringhurst added a comment - Just to clarify the state of this, the job is running, but the tasks are not. Once the env vars are setup properly, it seems like it might work: 2014-08-24 11:20:57 JobRunner [INFO] job factory: org.apache.samza.job.mesos.MesosJobFactory 2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5 2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@716: Client environment:host.name=localhost.localdomain 2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@723: Client environment:os.name=Linux 2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@724: Client environment:os.arch=3.10.0-123.6.3.el7.x86_64 2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@725: Client environment:os.version=#1 SMP Wed Aug 6 21:12:36 UTC 2014 2014-08-24 11:20:57,216:14126(0x7f52c940d700):ZOO_INFO@log_env@733: Client environment:user.name=jbringhu 2014-08-24 11:20:57,217:14126(0x7f52c940d700):ZOO_INFO@log_env@741: Client environment:user.home=/home/jbringhu 2014-08-24 11:20:57,217:14126(0x7f52c940d700):ZOO_INFO@log_env@753: Client environment:user.dir=/home/jbringhu/samza-dev/hello-samza 2014-08-24 11:20:57,217:14126(0x7f52c940d700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=127.0.0.1:2181 sessionTimeout=10000 watcher=0x7f52cefa2880 sessionId=0 sessionPasswd=<null> context=0x7f52a8001070 flags=0 2014-08-24 11:20:57,231:14126(0x7f52c7c0a700):ZOO_INFO@check_events@1703: initiated connection to server [127.0.0.1:2181] 2014-08-24 11:20:57 SamzaScheduler [INFO] No mesos.executor.count specified. Defaulting to one container. 2014-08-24 11:20:57,251:14126(0x7f52c7c0a700):ZOO_INFO@check_events@1750: session establishment complete on server [127.0.0.1:2181], sessionId=0x148089b0c550005, negotiated timeout=10000 2014-08-24 11:20:57 VerifiableProperties [INFO] Verifying properties 2014-08-24 11:20:57 VerifiableProperties [INFO] Property batch.num.messages is overridden to 1 2014-08-24 11:20:57 VerifiableProperties [INFO] Property client.id is overridden to samza_admin-wikipedia_feed-1-1408893657274-0 2014-08-24 11:20:57 VerifiableProperties [INFO] Property metadata.broker.list is overridden to localhost:9092 2014-08-24 11:20:57 VerifiableProperties [INFO] Property producer.type is overridden to sync 2014-08-24 11:20:57 VerifiableProperties [INFO] Verifying properties 2014-08-24 11:20:57 VerifiableProperties [INFO] Property client.id is overridden to samza_admin-wikipedia_feed-1-1408893657274-0 2014-08-24 11:20:57 VerifiableProperties [INFO] Property group.id is overridden to undefined-samza-consumer-group-8ef2ae53-c3ac-42e9-99d9-1d849d59b6a6 2014-08-24 11:20:57 VerifiableProperties [INFO] Property zookeeper.connect is overridden to localhost:2181/ 2014-08-24 11:20:57 Util$ [INFO] Instantiating type org.apache.samza.container.grouper.stream.GroupByPartitionFactory to build SystemStreamPartition groupings 2014-08-24 11:20:57 Util$ [INFO] SystemStreamPartitionGrouper org.apache.samza.container.grouper.stream.GroupByPartition@e1abbbd has grouped the SystemStreamPartitions into the following taskNames: 2014-08-24 11:20:57 Util$ [INFO] TaskName: Partition 0 => [SystemStreamPartition [wikipedia, #en.wiktionary, 0], SystemStreamPartition [wikipedia, #en.wikinews, 0], SystemStreamPartition [wikipedia, #en.wikipedia, 0]] 2014-08-24 11:20:57 Util$ [INFO] Assigning 1 SystemStreamPartitions taskNames to 1 containers. 2014-08-24 11:20:57 Util$ [INFO] Grouped SystemStreamPartition TaskNames (size = 1): 2014-08-24 11:20:57 Util$ [INFO] Container number: 0 => Map(Partition 0 -> Set(SystemStreamPartition [wikipedia, #en.wiktionary, 0], SystemStreamPartition [wikipedia, #en.wikinews, 0], SystemStreamPartition [wikipedia, #en.wikipedia, 0])) 2014-08-24 11:20:57 Util$ [INFO] Previous mapping of taskNames to partition: List() 2014-08-24 11:20:57 Util$ [INFO] Current set of taskNames: List(Partition 0) 2014-08-24 11:20:57 Util$ [WARN] No previous taskName mapping defined. This is OK if it's the first time the job is being run, otherwise data may have been lost. 2014-08-24 11:20:57 Util$ [INFO] No taskNames are missing between this run and previous 2014-08-24 11:20:57 Util$ [WARN] The following new taskNames have been added in this job run: Set(Partition 0) 2014-08-24 11:20:57 Util$ [INFO] New taskName to partition mapping: List((Partition 0,0)) 2014-08-24 11:20:57 SamzaScheduler [INFO] Awaiting offers for 1 executors I0824 11:20:57.433486 14158 sched.cpp:126] Version: 0.19.1 2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5 2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@716: Client environment:host.name=localhost.localdomain 2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@723: Client environment:os.name=Linux 2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@724: Client environment:os.arch=3.10.0-123.6.3.el7.x86_64 2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@725: Client environment:os.version=#1 SMP Wed Aug 6 21:12:36 UTC 2014 2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@733: Client environment:user.name=jbringhu 2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@741: Client environment:user.home=/home/jbringhu 2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@log_env@753: Client environment:user.dir=/home/jbringhu/samza-dev/hello-samza 2014-08-24 11:20:57,435:14126(0x7f52cac10700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=localhost:2181 sessionTimeout=10000 watcher=0x7f52cefa2880 sessionId=0 sessionPasswd=<null> context=0x7f52d4001040 flags=0 2014-08-24 11:20:57,436:14126(0x7f52c69ff700):ZOO_INFO@check_events@1703: initiated connection to server [127.0.0.1:2181] 2014-08-24 11:20:57,440:14126(0x7f52c69ff700):ZOO_INFO@check_events@1750: session establishment complete on server [127.0.0.1:2181], sessionId=0x148089b0c550006, negotiated timeout=10000 I0824 11:20:57.441314 14173 group.cpp:310] Group process ((5)@127.0.0.1:45052) connected to ZooKeeper I0824 11:20:57.441344 14173 group.cpp:784] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0) I0824 11:20:57.441359 14173 group.cpp:382] Trying to create path '/mesos' in ZooKeeper I0824 11:20:57.442654 14173 detector.cpp:135] Detected a new leader: (id='3') I0824 11:20:57.442818 14173 group.cpp:655] Trying to get '/mesos/info_0000000003' in ZooKeeper I0824 11:20:57.443436 14173 detector.cpp:377] A new leading master (UPID=master@127.0.0.1:5050) is detected I0824 11:20:57.443491 14173 sched.cpp:222] New master detected at master@127.0.0.1:5050 I0824 11:20:57.443764 14173 sched.cpp:230] No credentials provided. Attempting to register without authentication I0824 11:20:57.447118 14173 sched.cpp:397] Framework registered with wikipedia-feed 2014-08-24 11:20:57 SamzaScheduler [INFO] Framework registered 2014-08-24 11:20:57 SamzaScheduler [INFO] Received offer id { value: "20140824-112052-16777343-5050-14035-0" } framework_id { value: "wikipedia-feed" } slave_id { value: "20140824-033930-16777343-5050-12649-0" } hostname: "localhost" resources { name: "cpus" type: SCALAR scalar { value: 2.0 } role: "*" } resources { name: "mem" type: SCALAR scalar { value: 2761.0 } role: "*" } resources { name: "disk" type: SCALAR scalar { value: 12798.0 } role: "*" } resources { name: "ports" type: RANGES ranges { range { begin: 31000 end: 32000 } } role: "*" } 2014-08-24 11:20:57 SamzaScheduler [INFO] Got available task id (0) for offer: id { value: "20140824-112052-16777343-5050-14035-0" } framework_id { value: "wikipedia-feed" } slave_id { value: "20140824-033930-16777343-5050-12649-0" } hostname: "localhost" resources { name: "cpus" type: SCALAR scalar { value: 2.0 } role: "*" } resources { name: "mem" type: SCALAR scalar { value: 2761.0 } role: "*" } resources { name: "disk" type: SCALAR scalar { value: 12798.0 } role: "*" } resources { name: "ports" type: RANGES ranges { range { begin: 31000 end: 32000 } } role: "*" } 2014-08-24 11:20:57 SamzaScheduler [INFO] Claimed SSP taskNames Map(Partition 0 -> Set(SystemStreamPartition [wikipedia, #en.wiktionary, 0], SystemStreamPartition [wikipedia, #en.wikinews, 0], SystemStreamPartition [wikipedia, #en.wikipedia, 0])) for offer ID 0 2014-08-24 11:20:57 SamzaScheduler [INFO] Task ID 0 using command bin/run-container.sh 2014-08-24 11:20:57 SamzaScheduler [INFO] Launching task 0 2014-08-24 11:20:57 SamzaScheduler [INFO] Started task ID 0 2014-08-24 11:20:58 SamzaScheduler [INFO] (Status Update for Task %s: %s,value: "0" ,TASK_RUNNING) 2014-08-24 11:20:59 SamzaScheduler [INFO] (Status Update for Task %s: %s,value: "0" ,TASK_FAILED)
          Hide
          criccomini Chris Riccomini added a comment -

          This is awesome! I'm very excited to see progress here. Once you've got things running, let us know, and we can provide suggestions on how to make it trunk-ready.

          One question regarding the screencap. I'm not terribly familiar with Mesos. Based on the UI, it looks like it's looking at each Samza job as an independent "framework". Is this the suggested way to integrate Samza with Mesos, or do we need to provide some kind of scheduler, which Mesos sees just one of, and is "the framework"? I could see us wanting to move the YARN AM logic to a single centralized UI in Mesos, which plugs into Mesos. This UI/service could then manage all jobs.

          I'm just not too clear on what the standard pattern is here.

          Show
          criccomini Chris Riccomini added a comment - This is awesome! I'm very excited to see progress here. Once you've got things running, let us know, and we can provide suggestions on how to make it trunk-ready. One question regarding the screencap. I'm not terribly familiar with Mesos. Based on the UI, it looks like it's looking at each Samza job as an independent "framework". Is this the suggested way to integrate Samza with Mesos, or do we need to provide some kind of scheduler, which Mesos sees just one of, and is "the framework"? I could see us wanting to move the YARN AM logic to a single centralized UI in Mesos, which plugs into Mesos. This UI/service could then manage all jobs. I'm just not too clear on what the standard pattern is here.
          Hide
          criccomini Chris Riccomini added a comment -

          Also, regarding this log line: "Received offer id ...", could we make it a single-line log? Not too excited about multi-line logs.

          Show
          criccomini Chris Riccomini added a comment - Also, regarding this log line: "Received offer id ...", could we make it a single-line log? Not too excited about multi-line logs.
          Hide
          jonbringhurst Jon Bringhurst added a comment - - edited

          One question regarding the screencap. I'm not terribly familiar with Mesos. Based on the UI, it looks like it's looking at each Samza job as an independent "framework". Is this the suggested way to integrate Samza with Mesos, or do we need to provide some kind of scheduler, which Mesos sees just one of, and is "the framework"? I could see us wanting to move the YARN AM logic to a single centralized UI in Mesos, which plugs into Mesos. This UI/service could then manage all jobs.
          I'm just not too clear on what the standard pattern is here.

          Hey Chris Riccomini, here's what I have in mind: Each Samza job would run as a framework which manages tasks within a single job (the scheduler). Then, another "meta-framework" (a phrase that mesosphere seems to be using) would run to manage each of the frameworks for the Samza jobs. Initially, the meta-framework would be something like Marathon or Aurora, but could later evolve into a Samza-specific meta-framework, perhaps with a special interface for managing multiple Samza jobs and streams. Also, a Samza-specific meta-framework may be a useful place to download project packages from, or at least coordinate where the packages may be located. In the far future, something like Marathon's eventbus may be useful for communicating with Kafka (or other applications).

          Another alternative would be to have a scheduler for multiple jobs. However, it seems like this would require large changes to Samza (the MesosJobLauncher concept wouldn't really fit well) and would reduce fault tolerance since it would concentrate complex logic within a single process. However, it wouldn't reduce fault tolerance that much, so I'm definitely open to the idea. Perhaps the MesosJobLauncher could make an RPC call out to a Samza Mesos scheduler, which would then coordinate resources for the new job (I'm just brainstorming here, this specific idea seems kludgy).

          Having said that, I'd definately be interested in hearing the thoughts of anyone who's familiar with designing frameworks on Mesos. I'm open to ideas – this is just a prototype.

          Also, regarding this log line: "Received offer id ...", could we make it a single-line log? Not too excited about multi-line logs.

          Agreed. This was just for debugging purposes.

          Show
          jonbringhurst Jon Bringhurst added a comment - - edited One question regarding the screencap. I'm not terribly familiar with Mesos. Based on the UI, it looks like it's looking at each Samza job as an independent "framework". Is this the suggested way to integrate Samza with Mesos, or do we need to provide some kind of scheduler, which Mesos sees just one of, and is "the framework"? I could see us wanting to move the YARN AM logic to a single centralized UI in Mesos, which plugs into Mesos. This UI/service could then manage all jobs. I'm just not too clear on what the standard pattern is here. Hey Chris Riccomini , here's what I have in mind: Each Samza job would run as a framework which manages tasks within a single job (the scheduler). Then, another "meta-framework" (a phrase that mesosphere seems to be using) would run to manage each of the frameworks for the Samza jobs. Initially, the meta-framework would be something like Marathon or Aurora, but could later evolve into a Samza-specific meta-framework, perhaps with a special interface for managing multiple Samza jobs and streams. Also, a Samza-specific meta-framework may be a useful place to download project packages from, or at least coordinate where the packages may be located. In the far future, something like Marathon's eventbus may be useful for communicating with Kafka (or other applications). Another alternative would be to have a scheduler for multiple jobs. However, it seems like this would require large changes to Samza (the MesosJobLauncher concept wouldn't really fit well) and would reduce fault tolerance since it would concentrate complex logic within a single process. However, it wouldn't reduce fault tolerance that much, so I'm definitely open to the idea. Perhaps the MesosJobLauncher could make an RPC call out to a Samza Mesos scheduler, which would then coordinate resources for the new job (I'm just brainstorming here, this specific idea seems kludgy). Having said that, I'd definately be interested in hearing the thoughts of anyone who's familiar with designing frameworks on Mesos. I'm open to ideas – this is just a prototype. Also, regarding this log line: "Received offer id ...", could we make it a single-line log? Not too excited about multi-line logs. Agreed. This was just for debugging purposes.
          Hide
          criccomini Chris Riccomini added a comment -

          Each Samza job would run as a framework which manages tasks within a single job (the scheduler). Then, another "meta-framework" (a phrase that mesosphere seems to be using) would run to manage each of the frameworks for the Samza jobs. Initially, the meta-framework would be something like Marathon or Aurora, but could later evolve into a Samza-specific meta-framework

          Cool, this makes sense to me.

          Show
          criccomini Chris Riccomini added a comment - Each Samza job would run as a framework which manages tasks within a single job (the scheduler). Then, another "meta-framework" (a phrase that mesosphere seems to be using) would run to manage each of the frameworks for the Samza jobs. Initially, the meta-framework would be something like Marathon or Aurora, but could later evolve into a Samza-specific meta-framework Cool, this makes sense to me.
          Show
          criccomini Chris Riccomini added a comment - Relevant papers for those interested in Mesos: http://eurosys2013.tudos.org/wp-content/uploads/2013/paper/Schwarzkopf.pdf https://www.usenix.org/legacy/event/nsdi11/tech/full_papers/Hindman_new.pdf
          Hide
          jonbringhurst Jon Bringhurst added a comment - - edited

          Just a quick update – this is moving along slowly. Here's a new screenshot (https://issues.apache.org/jira/secure/attachment/12670432/Screen%20Shot%202014-09-22%20at%208.59.12%20AM.png) with a task running after passing constraints.

          Show
          jonbringhurst Jon Bringhurst added a comment - - edited Just a quick update – this is moving along slowly. Here's a new screenshot ( https://issues.apache.org/jira/secure/attachment/12670432/Screen%20Shot%202014-09-22%20at%208.59.12%20AM.png ) with a task running after passing constraints.
          Hide
          tnachen Timothy Chen added a comment -

          Hi Jon Bringhurst, not sure if you remember me (used to be on the Kafka team ), but I'm interested helping out to port Samza to Mesos.
          Is the effort you're doing open sourced somewhere?

          Show
          tnachen Timothy Chen added a comment - Hi Jon Bringhurst , not sure if you remember me (used to be on the Kafka team ), but I'm interested helping out to port Samza to Mesos. Is the effort you're doing open sourced somewhere?
          Hide
          joestein Joe Stein added a comment -

          I can spin up Samza on a mesos cluster once you have a patch to give it a user facing test drive, exciting!!!!

          Show
          joestein Joe Stein added a comment - I can spin up Samza on a mesos cluster once you have a patch to give it a user facing test drive, exciting!!!!
          Hide
          jonbringhurst Jon Bringhurst added a comment -

          Hey Timothy Chen, of course I remember you.

          The bulk of the code is in https://github.com/fintler/samza/tree/SAMZA-375/samza-mesos/src/main/scala/org/apache/samza/job/mesos

          And there's a hacked together hello-samza at https://github.com/fintler/hello-samza/tree/SAMZA-375

          I've been trying not to sit on anything for more than a day, so those are up to date.

          Keep in mind that I haven't written any Scala before this, so it might be a bit rough.

          Show
          jonbringhurst Jon Bringhurst added a comment - Hey Timothy Chen , of course I remember you. The bulk of the code is in https://github.com/fintler/samza/tree/SAMZA-375/samza-mesos/src/main/scala/org/apache/samza/job/mesos And there's a hacked together hello-samza at https://github.com/fintler/hello-samza/tree/SAMZA-375 I've been trying not to sit on anything for more than a day, so those are up to date. Keep in mind that I haven't written any Scala before this, so it might be a bit rough.
          Hide
          criccomini Chris Riccomini added a comment -

          Joe Stein, Timothy Chen, one thing that you guys might be able to clear up for us is how we're supposed to build the equivalent of YARN's AM. Jon and I are both pretty new to Mesos, and it's unclear what the right architecture is.

          In the YARN world, we have one ApplicationMaster (AM) per-Samza job. Each of these ApplicationMasters is responsible for coordinating amongst its containers (executors in Mesos, I believe). The AM is responsible for requesting resources, and defining what runs on each executor, and what to do when one of the executors fails.

          In Jon's current patch, we have basically converted the YARN AM to a framework. Thus, each Samza job appears as its own "framework" in the Mesos UI. This seems kind of odd. I'm wondering if we have the wrong abstraction, and we should instead of one Samza framework which coordinates amongst all Samza jobs on a single Mesos cluster.

          Can you guys provide some guidance here?

          Show
          criccomini Chris Riccomini added a comment - Joe Stein , Timothy Chen , one thing that you guys might be able to clear up for us is how we're supposed to build the equivalent of YARN's AM. Jon and I are both pretty new to Mesos, and it's unclear what the right architecture is. In the YARN world, we have one ApplicationMaster (AM) per-Samza job. Each of these ApplicationMasters is responsible for coordinating amongst its containers (executors in Mesos, I believe). The AM is responsible for requesting resources, and defining what runs on each executor, and what to do when one of the executors fails. In Jon's current patch, we have basically converted the YARN AM to a framework. Thus, each Samza job appears as its own "framework" in the Mesos UI. This seems kind of odd. I'm wondering if we have the wrong abstraction, and we should instead of one Samza framework which coordinates amongst all Samza jobs on a single Mesos cluster. Can you guys provide some guidance here?
          Hide
          tnachen Timothy Chen added a comment -

          Hi Chris Riccomini Jon Bringhurst,

          A framework in Mesos is associated to one role (for now) into Mesos master, and Mesos schedules and distribute resources according to a role:weight mapping. For example, if Samza and Hadoop each come up as "samza" and "hadoop" role and their weight is 3:1, then samza will get 3/4 of the cluster resources and hadoop 1/4, where default each has weight of one.

          So if you create a framework per job then mesos will start have a unevenness of resource managing.

          So I think we should model Samza as a whole as a single framework. The good thing about a Mesos framework is that itself can schedule how to allocate the given resources itself to what executors, and it can be dynamic by keeping its own mapping in the framework samza job -> list <executor>

          I haven't really dived much into Samza except the high level understanding I got during the time I was at LI.

          Where can I take a look what a yarn container is currently doing?

          Show
          tnachen Timothy Chen added a comment - Hi Chris Riccomini Jon Bringhurst , A framework in Mesos is associated to one role (for now) into Mesos master, and Mesos schedules and distribute resources according to a role:weight mapping. For example, if Samza and Hadoop each come up as "samza" and "hadoop" role and their weight is 3:1, then samza will get 3/4 of the cluster resources and hadoop 1/4, where default each has weight of one. So if you create a framework per job then mesos will start have a unevenness of resource managing. So I think we should model Samza as a whole as a single framework. The good thing about a Mesos framework is that itself can schedule how to allocate the given resources itself to what executors, and it can be dynamic by keeping its own mapping in the framework samza job -> list <executor> I haven't really dived much into Samza except the high level understanding I got during the time I was at LI. Where can I take a look what a yarn container is currently doing?
          Hide
          jonbringhurst Jon Bringhurst added a comment - - edited
          Show
          jonbringhurst Jon Bringhurst added a comment - - edited Hey Timothy Chen , in terms of this image ( https://i.imgur.com/PSIduMX.png ), the AM is mostly centered around onContainerAllocated in https://github.com/apache/incubator-samza/blob/350909d6c250fa1ae4d68c0c9b144ca756f84cf1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala#L106 , the client is basically https://github.com/apache/incubator-samza/blob/master/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala , and the Yarn container is wrapped with https://github.com/apache/incubator-samza/blob/master/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnContainer.scala . It might be useful to start from YarnJobFactory to see the setup, then review the path from SamzaAppMaster to YarnAppMasterListener/SamzaAppMasterTaskManager. The directory listing for this is at https://github.com/apache/incubator-samza/tree/master/samza-yarn/src/main/scala/org/apache/samza/job/yarn
          Hide
          jonbringhurst Jon Bringhurst added a comment -

          Also, the full presentation where that image came from is at:

          http://www.slideshare.net/criccomini/building-applications-on-yarn

          Show
          jonbringhurst Jon Bringhurst added a comment - Also, the full presentation where that image came from is at: http://www.slideshare.net/criccomini/building-applications-on-yarn
          Hide
          criccomini Chris Riccomini added a comment -

          Expanding on Jon's comments. The lifecycle of a Samza job in YARN looks like this:

          1. User runs run-job.sh with a config.
          2. run-job.sh negotiates with the YARN RM for a place to start Samza's YARN AM.
          3. The YARN RM (and NM) start the job's AM on a random host in the cluster.
          4. The AM looks at the job's config, and figures out how many containers its needs, what size, etc to run the job in question.
          5. The AM requests containers from the YARN RM according to what's needed based on the job's config.
          6. The AM heartbeats to the RM every N seconds, and gets containers assigned to it as time passes (similar to offers in Mesos).
          7. When the AM receives a new container, it tells YARN to start the container (on a remote host) by running the bash command run-container.sh. It also sets some environment variables when it's talking to YARN about starting the container, which YARN forwards to the SamzaContainer that's started. These variables tell the SamzaContainer which partitions (Kafka input partitions) it should read from, and also the job's configuration. Using these two things, the SamzaContainer starts up and runs.
          8. When a SamzaContainer fails, YARN's RM notifies the AM (during the AM's next heartbeat) that a container is dead. The AM then requests a new container, and when it arrives, assigns the dead partitions to it (via environment variable, again), and starts it up.
          Show
          criccomini Chris Riccomini added a comment - Expanding on Jon's comments. The lifecycle of a Samza job in YARN looks like this: User runs run-job.sh with a config. run-job.sh negotiates with the YARN RM for a place to start Samza's YARN AM. The YARN RM (and NM) start the job's AM on a random host in the cluster. The AM looks at the job's config, and figures out how many containers its needs, what size, etc to run the job in question. The AM requests containers from the YARN RM according to what's needed based on the job's config. The AM heartbeats to the RM every N seconds, and gets containers assigned to it as time passes (similar to offers in Mesos). When the AM receives a new container, it tells YARN to start the container (on a remote host) by running the bash command run-container.sh. It also sets some environment variables when it's talking to YARN about starting the container, which YARN forwards to the SamzaContainer that's started. These variables tell the SamzaContainer which partitions (Kafka input partitions) it should read from, and also the job's configuration. Using these two things, the SamzaContainer starts up and runs. When a SamzaContainer fails, YARN's RM notifies the AM (during the AM's next heartbeat) that a container is dead. The AM then requests a new container, and when it arrives, assigns the dead partitions to it (via environment variable, again), and starts it up.
          Hide
          criccomini Chris Riccomini added a comment -

          A key point in the YARN flow is that the scheduling is split. The RM decides who gets which offers based on the ResourceRequest, but once a resource has been given to an AM, the AM decides what to do with it.

          If we run a single "Samza" framework in Mesos, and that framework makes scheduling decisions for all Samza jobs, it seems akin to having YARN's RM make all scheduling decisions for all Samza jobs, including which containers process which partitions, etc. Having this logic decentralized seems kind of nice, at first glance. Not sure how Mesos handles this kind of pattern.

          Show
          criccomini Chris Riccomini added a comment - A key point in the YARN flow is that the scheduling is split. The RM decides who gets which offers based on the ResourceRequest, but once a resource has been given to an AM, the AM decides what to do with it. If we run a single "Samza" framework in Mesos, and that framework makes scheduling decisions for all Samza jobs, it seems akin to having YARN's RM make all scheduling decisions for all Samza jobs, including which containers process which partitions, etc. Having this logic decentralized seems kind of nice, at first glance. Not sure how Mesos handles this kind of pattern.
          Hide
          tnachen Timothy Chen added a comment -

          Jon BringhurstChris Riccomini thanks for the explanations, this is really useful!

          So a framework in Mesos is usually not scheduled to be placed somewhere in the cluster by Mesos, but started directly as a single instance to schedule tasks based on resource offers. The tasks once scheduled can either run by Mesos with specified command or frameworks like Samza can implement a custom executor that is responsible for starting a task, reporting the task status (started, finished/failed) and also the framework can receive messages from the executor, that also can contain data.

          I think actually it's ok launching a framework per job as long as it all using the same role, it is Samza's responsibilty to launch the framework itself though, and also Samza now doesn't have the full view of how much resources it totally has and each job is independently being resource scheduled by Mesos, and then the number of parallel frameworks that can get resource allocated is the number of slaves available as well.

          Launching just one framework then Samza can be more intelligent in the placing of the resources itself, and can launch multiple tasks that uses portions of it for the entireity of the job, but does become a more monolithic type resource controller.

          Just some preliminery thoughts, I think regardless we can continue down the path we have and have a functional prototype to try out with.

          Show
          tnachen Timothy Chen added a comment - Jon Bringhurst Chris Riccomini thanks for the explanations, this is really useful! So a framework in Mesos is usually not scheduled to be placed somewhere in the cluster by Mesos, but started directly as a single instance to schedule tasks based on resource offers. The tasks once scheduled can either run by Mesos with specified command or frameworks like Samza can implement a custom executor that is responsible for starting a task, reporting the task status (started, finished/failed) and also the framework can receive messages from the executor, that also can contain data. I think actually it's ok launching a framework per job as long as it all using the same role, it is Samza's responsibilty to launch the framework itself though, and also Samza now doesn't have the full view of how much resources it totally has and each job is independently being resource scheduled by Mesos, and then the number of parallel frameworks that can get resource allocated is the number of slaves available as well. Launching just one framework then Samza can be more intelligent in the placing of the resources itself, and can launch multiple tasks that uses portions of it for the entireity of the job, but does become a more monolithic type resource controller. Just some preliminery thoughts, I think regardless we can continue down the path we have and have a functional prototype to try out with.
          Hide
          jonbringhurst Jon Bringhurst added a comment - - edited

          Here's a whiteboard sketch of what I currently have in my head for a multiple-job architecture using the current design in my github repo.

          https://issues.apache.org/jira/secure/attachment/12671836/samza-mesos-multiple-jobs.jpg (sorry about the blurriness).

          Some parts are interchangeable (like marathon, the org-specific front-end, and haproxy), but this should give you an idea of what I'm thinking in general. The main benefit that I see to running a scheduler per-job is the ability to easily run multiple versions of Samza, which would then allow us to upgrade Samza jobs one-by-one.

          Show
          jonbringhurst Jon Bringhurst added a comment - - edited Here's a whiteboard sketch of what I currently have in my head for a multiple-job architecture using the current design in my github repo. https://issues.apache.org/jira/secure/attachment/12671836/samza-mesos-multiple-jobs.jpg (sorry about the blurriness). Some parts are interchangeable (like marathon, the org-specific front-end, and haproxy), but this should give you an idea of what I'm thinking in general. The main benefit that I see to running a scheduler per-job is the ability to easily run multiple versions of Samza, which would then allow us to upgrade Samza jobs one-by-one.
          Hide
          jonbringhurst Jon Bringhurst added a comment -

          The next steps for this:

          1. Wait for SAMZA-348.
          2. Cleanup sections where config is hardcoded.
          3. Write unit tests.
          4. Integration tests?
          Show
          jonbringhurst Jon Bringhurst added a comment - The next steps for this: Wait for SAMZA-348 . Cleanup sections where config is hardcoded. Write unit tests. Integration tests?
          Hide
          tnachen Timothy Chen added a comment -

          Sounds good, let me know how can I help in anyway!

          Show
          tnachen Timothy Chen added a comment - Sounds good, let me know how can I help in anyway!
          Hide
          jonbringhurst Jon Bringhurst added a comment -

          This is also waiting on the API for org.apache.samza.coordinator.JobCoordinator to stabilize a bit (SAMZA-444).

          Show
          jonbringhurst Jon Bringhurst added a comment - This is also waiting on the API for org.apache.samza.coordinator.JobCoordinator to stabilize a bit ( SAMZA-444 ).
          Hide
          tnachen Timothy Chen added a comment -

          Jon Bringhurst just thought about this ticket and see that the depended ticket you mentioned is resolved now. Are you planning to continue finish the samza framework?

          Show
          tnachen Timothy Chen added a comment - Jon Bringhurst just thought about this ticket and see that the depended ticket you mentioned is resolved now. Are you planning to continue finish the samza framework?
          Hide
          criccomini Chris Riccomini added a comment -

          Timothy Chen, we're still planning to do this. Unfortunately, I think the ticket that Jon pointed at was the wrong one. When SAMZA-348 is resolved, this ticket will be unblocked. The gist of things is that we're abstracting a lot of logic out of the YARN AM and into a JobCoordinator, which we can then use within a Mesos framework.

          Show
          criccomini Chris Riccomini added a comment - Timothy Chen , we're still planning to do this. Unfortunately, I think the ticket that Jon pointed at was the wrong one. When SAMZA-348 is resolved, this ticket will be unblocked. The gist of things is that we're abstracting a lot of logic out of the YARN AM and into a JobCoordinator, which we can then use within a Mesos framework.
          Hide
          tnachen Timothy Chen added a comment -

          Chris Riccomini thanks for the update, I'll check back when that's resolved then.

          Show
          tnachen Timothy Chen added a comment - Chris Riccomini thanks for the update, I'll check back when that's resolved then.
          Hide
          kostassoid Konstantin Alexandroff added a comment -

          Hello everyone,
          for everyone interested, we were able to integrate with Mesos using latest snapshot version of Samza. Meaning we're using JobCoordinator. Configuration immutability is still an issue but it's not critical in our case. Apart from that everything seems to be working fine. Am I missing something crucial?
          We're planning to release the adapter to GitHub soon.

          Show
          kostassoid Konstantin Alexandroff added a comment - Hello everyone, for everyone interested, we were able to integrate with Mesos using latest snapshot version of Samza. Meaning we're using JobCoordinator. Configuration immutability is still an issue but it's not critical in our case. Apart from that everything seems to be working fine. Am I missing something crucial? We're planning to release the adapter to GitHub soon.
          Hide
          criccomini Chris Riccomini added a comment -

          Konstantin Alexandroff, very cool! I don't think you're missing anything crucial. I suppose using the term "blocked on SAMZA-348" is a bit over-kill. We simply were thinking of doing the integration after SAMZA-348 in order to avoid churn. It's not yet clear how much the JobCoordinator changes are going to impact the YARN AM, and what logic will land where. We were planning on holding on the Mesos integration until this was more clear. The basic steps we were looking at were:

          1. Add a skeleton JobCoordinator (JC). (SAMAZ-438, done)
          2. Make JC talk to SamzaContainer via HTTP. (SAMZA-444, done)
          3. Make JobRunner talk to JC via CoordinatorStream. (SAMZA-448, patch available, blocked on SAMZA-226 to avoid merge conflicts)
          4. Eliminate the CheckpointManager and use CoordinatorStream instead. (SAMZA-465)

          From there, it will be clear what logic we can take out of the YARN AM and put in the JobCoordinator. The idea is to invert the control between the AM and the JC. Currently the AM tells the JC what to do. We're going to switch it to make the JC tell YARN what to do. Once that's done integrating with Mesos should be pretty trivial--we'll just have a plugin to tell Mesos what to do instead of YARN.

          That said, this is a fair amount of work. If you've got something working, feel free to post it. I am personally hesitant to merge it in, because the JobCoordinator changes are already quite messy, but it may be of use to folks in the meantime.

          Show
          criccomini Chris Riccomini added a comment - Konstantin Alexandroff , very cool! I don't think you're missing anything crucial. I suppose using the term "blocked on SAMZA-348 " is a bit over-kill. We simply were thinking of doing the integration after SAMZA-348 in order to avoid churn. It's not yet clear how much the JobCoordinator changes are going to impact the YARN AM, and what logic will land where. We were planning on holding on the Mesos integration until this was more clear. The basic steps we were looking at were: Add a skeleton JobCoordinator (JC). (SAMAZ-438, done) Make JC talk to SamzaContainer via HTTP. ( SAMZA-444 , done) Make JobRunner talk to JC via CoordinatorStream. ( SAMZA-448 , patch available, blocked on SAMZA-226 to avoid merge conflicts) Eliminate the CheckpointManager and use CoordinatorStream instead. ( SAMZA-465 ) From there, it will be clear what logic we can take out of the YARN AM and put in the JobCoordinator. The idea is to invert the control between the AM and the JC. Currently the AM tells the JC what to do. We're going to switch it to make the JC tell YARN what to do. Once that's done integrating with Mesos should be pretty trivial--we'll just have a plugin to tell Mesos what to do instead of YARN. That said, this is a fair amount of work. If you've got something working, feel free to post it. I am personally hesitant to merge it in, because the JobCoordinator changes are already quite messy, but it may be of use to folks in the meantime.
          Hide
          criccomini Chris Riccomini added a comment -

          BTW, if you post it, please put the link to GitHub on the ticket, so we can all track it.

          Show
          criccomini Chris Riccomini added a comment - BTW, if you post it, please put the link to GitHub on the ticket, so we can all track it.
          Hide
          kostassoid Konstantin Alexandroff added a comment -

          Chris Riccomini, thanks for details!
          We were watching Samza development closely for the past few months and really like the way it is going. But business goals require us to release faster so we don't have much choice.
          And of course, once we release samza-mesos integration, I'll put the link here.

          Show
          kostassoid Konstantin Alexandroff added a comment - Chris Riccomini , thanks for details! We were watching Samza development closely for the past few months and really like the way it is going. But business goals require us to release faster so we don't have much choice. And of course, once we release samza-mesos integration, I'll put the link here.
          Hide
          kostassoid Konstantin Alexandroff added a comment -

          As promised, our attempt at integration:
          https://github.com/InnovaCo/samza-mesos

          Show
          kostassoid Konstantin Alexandroff added a comment - As promised, our attempt at integration: https://github.com/InnovaCo/samza-mesos
          Hide
          criccomini Chris Riccomini added a comment -

          Konstantin Alexandroff, awesome, thanks for this! Will probably use it as a reference when we finish SAMZA-348 job coordinator work.

          Show
          criccomini Chris Riccomini added a comment - Konstantin Alexandroff , awesome, thanks for this! Will probably use it as a reference when we finish SAMZA-348 job coordinator work.
          Hide
          zcox Zach Cox added a comment -

          Hi - we also need to run Samza jobs on Mesos - I backported the InnovaCo samza-mesos to Samza 0.8.0, as we're a bit hesitant to base things off 0.9.0-SNAPSHOT:

          https://github.com/banno/samza-mesos/tree/samza-0.8.0

          I was able to submit several Samza jobs to Marathon using our fork and see all of the Samza containers run successfully as Mesos tasks. We're still in development though, not on production yet - I'll update our fork as things come up. Really excited to see Samza progressing and look forward to official Mesos support!

          Show
          zcox Zach Cox added a comment - Hi - we also need to run Samza jobs on Mesos - I backported the InnovaCo samza-mesos to Samza 0.8.0, as we're a bit hesitant to base things off 0.9.0-SNAPSHOT: https://github.com/banno/samza-mesos/tree/samza-0.8.0 I was able to submit several Samza jobs to Marathon using our fork and see all of the Samza containers run successfully as Mesos tasks. We're still in development though, not on production yet - I'll update our fork as things come up. Really excited to see Samza progressing and look forward to official Mesos support!
          Hide
          criccomini Chris Riccomini added a comment -

          Zach Cox, thanks for this! Looking forward to integrating this stuff into Samza after SAMZA-348.

          Show
          criccomini Chris Riccomini added a comment - Zach Cox , thanks for this! Looking forward to integrating this stuff into Samza after SAMZA-348 .
          Hide
          drcrallen Charles Allen added a comment -

          So has the main code for this moved from Jon Bringhurst's GitHub to InnovaCo's as posted by Konstantin Alexandroff?

          Show
          drcrallen Charles Allen added a comment - So has the main code for this moved from Jon Bringhurst 's GitHub to InnovaCo's as posted by Konstantin Alexandroff ?
          Hide
          criccomini Chris Riccomini added a comment -

          Charles Allen, yes, I'd recommend using InnovaCo's implementation.

          Show
          criccomini Chris Riccomini added a comment - Charles Allen , yes, I'd recommend using InnovaCo's implementation.
          Hide
          kostassoid Konstantin Alexandroff added a comment -

          I believe Banno's fork is even more advanced at this point (https://github.com/Banno/samza-mesos) as it supports Docker as well.

          Show
          kostassoid Konstantin Alexandroff added a comment - I believe Banno's fork is even more advanced at this point ( https://github.com/Banno/samza-mesos ) as it supports Docker as well.
          Hide
          zcox Zach Cox added a comment -

          Yeah we've fixed several things and added Docker support on the Banno fork. Note though that our fork is backported to use Samza 0.8.0 and does not use anything from 0.9.0 currently under development.

          It's still a work-in-progress, but we intend to have Samza jobs on our production Mesos cluster within the next few weeks using this framework. I'll report back here when we reach that point.

          Show
          zcox Zach Cox added a comment - Yeah we've fixed several things and added Docker support on the Banno fork. Note though that our fork is backported to use Samza 0.8.0 and does not use anything from 0.9.0 currently under development. It's still a work-in-progress, but we intend to have Samza jobs on our production Mesos cluster within the next few weeks using this framework. I'll report back here when we reach that point.
          Hide
          jonbringhurst Jon Bringhurst added a comment -

          Just so it's officially owned by Apache, I've attached a patch to this issue with the code I originally wrote. The contents are the same as:

          https://github.com/fintler/samza/compare/SAMZA-375.patch

          This patch isn't something we should commit to trunk without significant polish (such as the work done by Konstantin Alexandroff and Zach Cox). It's just here to make things official.

          Show
          jonbringhurst Jon Bringhurst added a comment - Just so it's officially owned by Apache, I've attached a patch to this issue with the code I originally wrote. The contents are the same as: https://github.com/fintler/samza/compare/SAMZA-375.patch This patch isn't something we should commit to trunk without significant polish (such as the work done by Konstantin Alexandroff and Zach Cox ). It's just here to make things official.
          Hide
          criccomini Chris Riccomini added a comment -

          Timothy Chen, random question about Mesos+Samza: How does Mesos handle orphaned tasks? If I have Mesos running with a Samza task running on a machine, and there is a network partition, will the Mesos slave detect the partition, and kill everything?

          Show
          criccomini Chris Riccomini added a comment - Timothy Chen , random question about Mesos+Samza: How does Mesos handle orphaned tasks? If I have Mesos running with a Samza task running on a machine, and there is a network partition, will the Mesos slave detect the partition, and kill everything?
          Hide
          kostassoid Konstantin Alexandroff added a comment -

          Chris Riccomini, one question bothers me.

          Is there any guaranteed behavior (when using YARN) regarding task commit (state/checkpoint) happening upon:
          a. Normal task completion
          b. Unhandled exception
          c. Process termination
          ?

          Mesos normally terminates tasks by sending SIGTERM to process root. We handle these signals using shutdown hook in StreamJob to close Mesos tasks cleanly. But we do nothing for Samza task containers and they seem to just die abruptly (without proper shutdown). I don't see YARN adapter doing anything special regarding this as well. Is this by design or YARN terminates tasks differently?

          We would prefer tasks to commit upon process termination. This would allow us to update jobs as frequently as needed. If this is not a default behavior then I suppose we shouldn't break it in Mesos adapter. But what's the best way to override it on project level, by implementing custom SamzaContainer (with custom RunLoop perhaps)?

          Show
          kostassoid Konstantin Alexandroff added a comment - Chris Riccomini , one question bothers me. Is there any guaranteed behavior (when using YARN) regarding task commit (state/checkpoint) happening upon: a. Normal task completion b. Unhandled exception c. Process termination ? Mesos normally terminates tasks by sending SIGTERM to process root. We handle these signals using shutdown hook in StreamJob to close Mesos tasks cleanly. But we do nothing for Samza task containers and they seem to just die abruptly (without proper shutdown). I don't see YARN adapter doing anything special regarding this as well. Is this by design or YARN terminates tasks differently? We would prefer tasks to commit upon process termination. This would allow us to update jobs as frequently as needed. If this is not a default behavior then I suppose we shouldn't break it in Mesos adapter. But what's the best way to override it on project level, by implementing custom SamzaContainer (with custom RunLoop perhaps)?
          Hide
          zcox Zach Cox added a comment -

          Konstantin Alexandroff I added a comment to https://issues.apache.org/jira/browse/SAMZA-506 with info on how we are handling clean Samza container shutdown on Mesos.

          Show
          zcox Zach Cox added a comment - Konstantin Alexandroff I added a comment to https://issues.apache.org/jira/browse/SAMZA-506 with info on how we are handling clean Samza container shutdown on Mesos.
          Hide
          criccomini Chris Riccomini added a comment -

          Both YARN and Mesos send SIGTERM now. Samza should cleanly shut down, but doesn't SAMZA-506 is meant to address this.

          Show
          criccomini Chris Riccomini added a comment - Both YARN and Mesos send SIGTERM now. Samza should cleanly shut down, but doesn't SAMZA-506 is meant to address this.
          Hide
          kostassoid Konstantin Alexandroff added a comment -

          Zach Cox, thank you, creative approach!

          Show
          kostassoid Konstantin Alexandroff added a comment - Zach Cox , thank you, creative approach!
          Hide
          zcox Zach Cox added a comment -

          Attached is a tarball of the latest master of https://github.com/banno/samza-mesos, which we've been running in production for several months. This code is being given to the ASF for potential use in Samza.

          Show
          zcox Zach Cox added a comment - Attached is a tarball of the latest master of https://github.com/banno/samza-mesos , which we've been running in production for several months. This code is being given to the ASF for potential use in Samza.
          Hide
          drcrallen Charles Allen added a comment -

          Zach Cox: Awesome.

          I looked over your branch a while ago and took another very brief peek at it. One key concern I have is with framework failover, or even rolling upgrades. Do you have any particular means of reconciliation or failover for the running samza tasks?

          Show
          drcrallen Charles Allen added a comment - Zach Cox : Awesome. I looked over your branch a while ago and took another very brief peek at it. One key concern I have is with framework failover, or even rolling upgrades. Do you have any particular means of reconciliation or failover for the running samza tasks?
          Hide
          zcox Zach Cox added a comment -

          Charles Allen I wrote a nice long reply, but jira lost it. Currently there is no failover/reconciliation. This has been fine for us so far, since we make sure the containers commit/checkpoint on shutdown. That said, good failover/reconciliation should be added to this framework, which should be straightforward to do. I don't have time to help with this right now, but should be able to in the near future.

          Show
          zcox Zach Cox added a comment - Charles Allen I wrote a nice long reply, but jira lost it. Currently there is no failover/reconciliation. This has been fine for us so far, since we make sure the containers commit/checkpoint on shutdown. That said, good failover/reconciliation should be added to this framework, which should be straightforward to do. I don't have time to help with this right now, but should be able to in the near future.
          Hide
          kostassoid Konstantin Alexandroff added a comment -

          There's a problem we discovered just recently because of this, when we upgraded mesos master in multi-master cluster. Coordinator was running but task containers were lost. So it is indeed a rare case but possible and frustrating. Planning to fix this as well.

          Show
          kostassoid Konstantin Alexandroff added a comment - There's a problem we discovered just recently because of this, when we upgraded mesos master in multi-master cluster. Coordinator was running but task containers were lost. So it is indeed a rare case but possible and frustrating. Planning to fix this as well.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Zach Cox, awesome work and thanks for posting the tarball back here! Is the tarball still based on Samza-0.8.0?

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Zach Cox , awesome work and thanks for posting the tarball back here! Is the tarball still based on Samza-0.8.0?
          Hide
          zcox Zach Cox added a comment -

          Yi Pan (Data Infrastructure) yes it's samza 0.8.0, we haven't upgraded to 0.9.0 yet.

          Show
          zcox Zach Cox added a comment - Yi Pan (Data Infrastructure) yes it's samza 0.8.0, we haven't upgraded to 0.9.0 yet.
          Hide
          jayson.minard Jayson Minard added a comment -

          Not sure which fork is the best for 0.9.0 final, InnovaCo is 0.9.0-SNAPSHOT and obviously probably closer. Banno is 0.8.0 but adds Docker. Correct on the current state, no update or work done since?

          Show
          jayson.minard Jayson Minard added a comment - Not sure which fork is the best for 0.9.0 final, InnovaCo is 0.9.0-SNAPSHOT and obviously probably closer. Banno is 0.8.0 but adds Docker. Correct on the current state, no update or work done since?
          Hide
          kostassoid Konstantin Alexandroff added a comment -

          Jayson Minard We (InnovaCo) currently use Samza based on commit 5a035b8c4a4b1bd0ccf4d3df25cc1ce51a8bd06f. Our samza-mesos version is compatible up to this point. Not sure if it'll work with actual SNAPSHOT now. Requires some testing.

          Show
          kostassoid Konstantin Alexandroff added a comment - Jayson Minard We (InnovaCo) currently use Samza based on commit 5a035b8c4a4b1bd0ccf4d3df25cc1ce51a8bd06f. Our samza-mesos version is compatible up to this point. Not sure if it'll work with actual SNAPSHOT now. Requires some testing.
          Hide
          zcox Zach Cox added a comment -

          Jayson Minard you are correct re: Banno fork - it's currently on Samza 0.8.0 and has support for Docker. We have an upcoming card to upgrade all of our Samza usage to 0.9.0, which will include samza-mesos. Someone should be working on that in the next few weeks.

          Show
          zcox Zach Cox added a comment - Jayson Minard you are correct re: Banno fork - it's currently on Samza 0.8.0 and has support for Docker. We have an upcoming card to upgrade all of our Samza usage to 0.9.0, which will include samza-mesos. Someone should be working on that in the next few weeks.

            People

            • Assignee:
              jonbringhurst Jon Bringhurst
              Reporter:
              jonbringhurst Jon Bringhurst
            • Votes:
              13 Vote for this issue
              Watchers:
              22 Start watching this issue

              Dates

              • Created:
                Updated:

                Development