Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-4868

Twitter DStream.map() throws "Task not serializable"

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Incomplete
    • Affects Version/s: 1.1.1, 1.2.0
    • Fix Version/s: None
    • Component/s: DStreams, Spark Shell
    • Labels:
    • Environment:

      Description

      (Continuing the discussion started here on the Spark user list.)

      The following Spark Streaming code throws a serialization exception I do not understand.

      import twitter4j.auth.{Authorization, OAuthAuthorization}
      import twitter4j.conf.ConfigurationBuilder 
      import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
      import org.apache.spark.streaming.twitter.TwitterUtils
      
      def getAuth(): Option[Authorization] = {
        System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
        System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
        System.setProperty("twitter4j.oauth.accessToken", "accessToken") 
        System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
      
        Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
      } 
      
      def noop(a: Any): Any = {
        a
      }
      
      val ssc = new StreamingContext(sc, Seconds(5))
      val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
      val liveTweets = liveTweetObjects.map(_.getText)
      
      liveTweets.map(t => noop(t)).print()  // exception here
      
      ssc.start()
      

      So before I even start the StreamingContext, I get the following stack trace:

      scala> liveTweets.map(t => noop(t)).print()
      org.apache.spark.SparkException: Task not serializable
      	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
      	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
      	at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
      	at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
      	at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
      	at $iwC$$iwC$$iwC.<init>(<console>:32)
      	at $iwC$$iwC.<init>(<console>:34)
      	at $iwC.<init>(<console>:36)
      	at <init>(<console>:38)
      	at .<init>(<console>:42)
      	at .<clinit>(<console>)
      	at .<init>(<console>:7)
      	at .<clinit>(<console>)
      	at $print(<console>)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
      	at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
      	at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
      	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
      	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
      	at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
      	at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
      	at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
      	at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
      	at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
      	at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
      	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
      	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
      	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
      	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
      	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
      	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
      	at org.apache.spark.repl.Main$.main(Main.scala:31)
      	at org.apache.spark.repl.Main.main(Main.scala)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
      	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
      	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
      	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
      	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
      	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
      	... 43 more
      

      What I'm really trying to do is use Spark Streaming via the interactive shell to filter Tweets using a trained KMeans model. I got errors trying that, and I boiled it down to this repro.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              nchammas Nicholas Chammas
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: