Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-493

Workers don't inherit storm.conf.file/storm.options properties of the supervisor

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 0.9.2-incubating, 0.9.3
    • 0.9.3
    • storm-core
    • None

    Description

      If we override some configuration parameters on the command line (using storm -c "param=value") when we launch the supervisor, workers don't inherit them.

      > cat conf/storm.yaml
      storm.zookeeper.servers:
           - "127.0.0.1"
      nimbus.host: "127.0.0.1"
      storm.zookeeper.root: "/stormtest"
      storm.local.dir: "storm-local-main"
      
      > python bin/storm -c "storm.local.dir=\"storm-local-custom\"" supervisor
      
      > less logs/worker-6701.log
      [...]
      2014-09-10 09:35:00 o.a.s.z.s.ZooKeeperServer [INFO] Server environment:user.dir=/optc/2014-09-05-5aae7686
      2014-09-10 09:35:01 b.s.d.worker [INFO] Launching worker for mytopo-1-1410334488 on 96f32da2-2043-4371-988b-ec9ca107ce69:6701 with id b9178c80-922b-4b8d-9984-7cfaf06f3c86 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "storm-local-main", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "127.0.0.1", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/stormtest", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["127.0.0.1"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
      2014-09-10 09:35:01 b.s.util [DEBUG] Touching file at storm-local-main/workers/b9178c80-922b-4b8d-9984-7cfaf06f3c86/pids/30958
      2014-09-10 09:35:01 b.s.d.worker [ERROR] Error on initialization of server mk-worker
      java.io.IOException: No such file or directory
      	at java.io.UnixFileSystem.createFileExclusively(Native Method) ~[na:1.7.0_60]
      	at java.io.File.createNewFile(File.java:1006) ~[na:1.7.0_60]
      	at backtype.storm.util$touch.invoke(util.clj:519) ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at backtype.storm.daemon.worker$fn__6535$exec_fn__1474__auto____6536.invoke(worker.clj:362) ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na]
      	at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
      	at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker$fn__6535$mk_worker__6591.doInvoke(worker.clj:354) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
      	at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      2014-09-10 09:35:01 b.s.util [ERROR] Halting process: ("Error on initialization")
      java.lang.RuntimeException: ("Error on initialization")
      	at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:319) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker$fn__6535$mk_worker__6591.doInvoke(worker.clj:354) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
      	at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      [...]
      

      The worker tries to use the local directory defined in the configuration file (storm-local-main) instead of the local directory defined on the command line (storm-local-custom).


      There is a similar problem with configuration file name. If we specify it on the command line, workers don't inherit it and search the default "storm.yaml" file in the classpath.

      > cat conf/storm.yaml
      storm.zookeeper.servers:
           - "127.0.0.1"
      nimbus.host: "127.0.0.1"
      storm.zookeeper.root: "/stormtest"
      storm.local.dir: "storm-local-default"
      
      > cat conf/storm-custom.yaml
      storm.zookeeper.servers:
           - "127.0.0.1"
      nimbus.host: "127.0.0.1"
      storm.zookeeper.root: "/stormtest"
      storm.local.dir: "storm-local-custom"
      
      > python bin/storm --config storm-custom.yaml supervisor
      
      > less logs/worker-6700.log
      [...]
      2014-09-10 10:16:01 o.a.s.z.s.ZooKeeperServer [INFO] Server environment:user.dir=/optc/2014-09-05-5aae7686
      2014-09-10 10:16:01 b.s.d.worker [INFO] Launching worker for mytopo-1-1410336834 on 38c99fbd-e5e5-4c53-b704-7ec460f5e227:6700 with id 2133bad6-432d-4c63-a156-1184064d296b and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "storm-local-default", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "127.0.0.1", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/stormtest", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["127.0.0.1"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
      2014-09-10 10:16:01 b.s.util [DEBUG] Touching file at storm-local-default/workers/2133bad6-432d-4c63-a156-1184064d296b/pids/31689
      2014-09-10 10:16:01 b.s.d.worker [ERROR] Error on initialization of server mk-worker
      java.io.IOException: No such file or directory
      	at java.io.UnixFileSystem.createFileExclusively(Native Method) ~[na:1.7.0_60]
      	at java.io.File.createNewFile(File.java:1006) ~[na:1.7.0_60]
      	at backtype.storm.util$touch.invoke(util.clj:519) ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at backtype.storm.daemon.worker$fn__6535$exec_fn__1474__auto____6536.invoke(worker.clj:362) ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na]
      	at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
      	at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker$fn__6535$mk_worker__6591.doInvoke(worker.clj:354) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
      	at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      2014-09-10 10:16:01 b.s.util [ERROR] Halting process: ("Error on initialization")
      java.lang.RuntimeException: ("Error on initialization")
      	at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:319) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker$fn__6535$mk_worker__6591.doInvoke(worker.clj:354) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker$_main.invoke(worker.clj:461) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      	at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na]
      	at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na]
      	at backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
      

      The worker tries to use the local directory "storm-local-default" defined in the configuration file "storm.yaml" instead of using the local directory "storm-local-custom" defined in the file "storm-custom.yaml" specified on the command line.

      Attachments

        Activity

          People

            chrisz Christophe Carré
            chrisz Christophe Carré
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: