Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18560

Receiver data can not be dataSerialized properly.

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Duplicate
    • 2.0.2
    • 2.0.3, 2.1.0
    • DStreams
    • None

    Description

      My spark streaming job can run correctly on Spark 1.6.1, but it can not run properly on Spark 2.0.1, with following exception:

      16/11/22 19:20:15 ERROR executor.Executor: Exception in task 4.3 in stage 6.0 (TID 87)
      com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
      	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
      	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
      	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:243)
      	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
      	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
      	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
      	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
      	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
      	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      	at org.apache.spark.scheduler.Task.run(Task.scala:108)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      Go deep into relevant implementation, I find the type of data received by Receiver is erased. And in Spark2.x, framework can choose a appropriate Serializer from JavaSerializer and KryoSerializer base on the type of data.

      At the Receiver side, the type of data is erased to be Object, so framework will choose JavaSerializer, with following code:

      def canUseKryo(ct: ClassTag[_]): Boolean = {
          primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
        }
      
        def getSerializer(ct: ClassTag[_]): Serializer = {
          if (canUseKryo(ct)) {
            kryoSerializer
          } else {
            defaultSerializer
          }
        }
      

      At task side, we can get correct data type, and framework will choose KryoSerializer if possible, with following supported type:

      private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]
      private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
          val primitiveClassTags = Set[ClassTag[_]](
            ClassTag.Boolean,
            ClassTag.Byte,
            ClassTag.Char,
            ClassTag.Double,
            ClassTag.Float,
            ClassTag.Int,
            ClassTag.Long,
            ClassTag.Null,
            ClassTag.Short
          )
          val arrayClassTags = primitiveClassTags.map(_.wrap)
          primitiveClassTags ++ arrayClassTags
        }
      

      In my case, the type of data is Byte Array.

      This problem stems from SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            uncleGen Genmao Yu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment