Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-637

Support Java serialisation (via JavaSerialiser)

Details

    • Improvement
    • Status: Resolved
    • P3
    • Resolution: Won't Fix
    • None
    • Not applicable
    • runner-spark
    • None

    Description

      When using JavaSerializer instead of KryoSerializer by configuring JavaSparkContext with "spark.serializer" set to JavaSerializer.class.getCanonicalName(), an exception is thrown:

      object not serializable (class: org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
      
      Serialization stack:
      	- object not serializable (class: org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow, value: ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
      	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
      	at scala.Option.foreach(Option.scala:236)
      	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
      	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
      	at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
      	at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
      	at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
      	at org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:175)
      	at org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:172)
      	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
      	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
      	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
      	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
      	at scala.util.Try$.apply(Try.scala:161)
      	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
      	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
      	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
      	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
      	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      
      

      This issue is reproducible with the SimpleStreamingWordCountTest#testRun test (given JavaSerialiser is configured).

      Attachments

        Activity

          People

            Unassigned Unassigned
            staslev Stas Levin
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: