Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-113

TridentUtils.thriftDeserialize() is used unsafely when running in local mode.

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.2-incubating
    • Component/s: storm-core
    • Labels:
      None

      Description

      https://github.com/nathanmarz/storm/issues/500

      When running a trident topology with multiple workers in local mode, each worker runs as a thread in a single jvm. These worker threads call TridentUtils.thriftDeserialize() concurrently when loading the (serialized) topology. However, this method is not threadsafe. In particular, it uses a static reference to a TDeserializer, which in turn uses an instance of TMemoryInputTransport, which is stateful. The end result is sporadic and nondeterministic Thrift exceptions when starting the workers.

      This is not difficult to fix, but I'm not sure which approach the committers want to take. I made the deserializer (and serializer) ThreadLocal vars.

      ----------
      juhoautio: Thanks for explaining the problem! This issue made my test fail* occasionally when trying to submit a topology to a local cluster.

      For now I set number of workers & max parallelism to 1, but eventually I'd like to be able to enable some parallelism. I'm not sure if both parameters are significant, but here's a code extract for rerefence:

      conf = new Config();
      conf.setNumWorkers(1);
      conf.setMaxTaskParallelism(1);
      // ...
      cluster = new LocalCluster();
      cluster.submitTopology(.., conf, ..);
      With this configuration the problem hasn't happened any more.

      *) Stacktrace for the error (happens occasionally when number of workers is set to 2) :

      2629 [Thread-9] ERROR backtype.storm.daemon.worker - Error on initialization of server mk-worker
      java.lang.RuntimeException: org.apache.thrift7.TException: Negative length: -72553216
      at storm.trident.util.TridentUtils.thriftDeserialize(TridentUtils.java:111)
      at storm.trident.planner.PartitionNode.readObject(PartitionNode.java:33)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:606)
      at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
      at java.util.HashMap.readObject(HashMap.java:1183)
      at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:606)
      at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
      at backtype.storm.utils.Utils.deserialize(Utils.java:64)
      at backtype.storm.utils.Utils.getSetComponentObject(Utils.java:199)
      at backtype.storm.daemon.task$get_task_object.invoke(task.clj:56)
      at backtype.storm.daemon.task$mk_task_data$fn__3766.invoke(task.clj:158)
      at backtype.storm.util$assoc_apply_self.invoke(util.clj:731)
      at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:152)
      at backtype.storm.daemon.task$mk_task.invoke(task.clj:163)
      at backtype.storm.daemon.executor$mk_executor$fn__3922.invoke(executor.clj:267)
      at clojure.core$map$fn__4087.invoke(core.clj:2432)
      at clojure.lang.LazySeq.sval(LazySeq.java:42)
      at clojure.lang.LazySeq.seq(LazySeq.java:60)
      at clojure.lang.RT.seq(RT.java:473)
      at clojure.core$seq.invoke(core.clj:133)
      at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
      at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
      at clojure.core.protocols$fn_5828$G5823_5841.invoke(protocols.clj:13)
      at clojure.core$reduce.invoke(core.clj:6030)
      at clojure.core$into.invoke(core.clj:6077)
      at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:267)
      at backtype.storm.daemon.worker$fn_4348$exec_fn1228auto__4349$iter43544358$fn_4359.invoke(worker.clj:354)
      at clojure.lang.LazySeq.sval(LazySeq.java:42)
      at clojure.lang.LazySeq.seq(LazySeq.java:60)
      at clojure.lang.Cons.next(Cons.java:39)
      at clojure.lang.LazySeq.next(LazySeq.java:92)
      at clojure.lang.RT.next(RT.java:587)
      at clojure.core$next.invoke(core.clj:64)
      at clojure.core$dorun.invoke(core.clj:2726)
      at clojure.core$doall.invoke(core.clj:2741)
      at backtype.storm.daemon.worker$fn_4348$exec_fn1228auto___4349.invoke(worker.clj:354)
      at clojure.lang.AFn.applyToHelper(AFn.java:185)
      at clojure.lang.AFn.applyTo(AFn.java:151)
      at clojure.core$apply.invoke(core.clj:601)
      at backtype.storm.daemon.worker$fn_4348$mk_worker_4404.doInvoke(worker.clj:323)
      at clojure.lang.RestFn.invoke(RestFn.java:512)
      at backtype.storm.daemon.supervisor$fn__4807.invoke(supervisor.clj:467)
      at clojure.lang.MultiFn.invoke(MultiFn.java:177)
      at backtype.storm.daemon.supervisor$sync_processes$iter_46844688$fn_4689.invoke(supervisor.clj:249)
      at clojure.lang.LazySeq.sval(LazySeq.java:42)
      at clojure.lang.LazySeq.seq(LazySeq.java:60)
      at clojure.lang.RT.seq(RT.java:473)
      at clojure.core$seq.invoke(core.clj:133)
      at clojure.core$dorun.invoke(core.clj:2725)
      at clojure.core$doall.invoke(core.clj:2741)
      at backtype.storm.daemon.supervisor$sync_processes.invoke(supervisor.clj:237)
      at clojure.lang.AFn.applyToHelper(AFn.java:161)
      at clojure.lang.AFn.applyTo(AFn.java:151)
      at clojure.core$apply.invoke(core.clj:603)
      at clojure.core$partial$fn__4070.doInvoke(core.clj:2343)
      at clojure.lang.RestFn.invoke(RestFn.java:397)
      at backtype.storm.event$event_manager$fn__2507.invoke(event.clj:24)
      at clojure.lang.AFn.run(AFn.java:24)
      at java.lang.Thread.run(Thread.java:724)
      Caused by: org.apache.thrift7.TException: Negative length: -72553216
      at org.apache.thrift7.protocol.TBinaryProtocol.checkReadLength(TBinaryProtocol.java:388)
      at org.apache.thrift7.protocol.TBinaryProtocol.readBinary(TBinaryProtocol.java:363)
      at org.apache.thrift7.protocol.TProtocolUtil.skip(TProtocolUtil.java:102)
      at org.apache.thrift7.protocol.TProtocolUtil.skip(TProtocolUtil.java:60)
      at backtype.storm.generated.Grouping.readValue(Grouping.java:353)
      at org.apache.thrift7.TUnion.read(TUnion.java:135)
      at org.apache.thrift7.TDeserializer.deserialize(TDeserializer.java:69)
      at storm.trident.util.TridentUtils.thriftDeserialize(TridentUtils.java:108)
      ... 79 more
      2653 [Thread-9] INFO backtype.storm.util - Halting process: ("Error on initialization")

        Attachments

          Activity

            People

            • Assignee:
              supercargo Adam Lewis
              Reporter:
              xumingming James Xu
            • Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: