Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-4597

Serialization problem using SparkRunner and KryoSerializer from spark

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.4.0
    • Not applicable
    • runner-spark
    • None

    Description

      When using the SparkRunner and specifying Spark to use the 'KryoSerializer' as:

      spark-submit --class org.apache.beam.examples.BugWithKryoOnSpark --master yarn --deploy-mode client --conf spark.serializer=org.apache.spark.serializer.KryoSerializer /tmp/kafka-sdk-beam-example-bundled-0.1.jar --runner=SparkRunner

      We get an exception after 10 or 15 seconds:

      Exception in thread "main" java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1875283985
      Serialization trace:
      factory (org.apache.beam.runners.core.metrics.MetricsMap)
      counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
      metricsContainers (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
      metricsContainers (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
      at org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
      at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
      at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
      at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
      at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
      at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
      at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
      at org.apache.beam.examples.BugWithKryoOnSpark.main(BugWithKryoOnSpark.java:75)

      But when using the SparkRunner and specifying Spark to use the 'JavaSerializer' as:

      spark-submit --class org.apache.beam.examples.BugWithKryoOnSpark --master yarn --deploy-mode client --conf spark.serializer=org.apache.spark.serializer.JavaSerializer /tmp/kafka-sdk-beam-example-bundled-0.1.jar --runner=SparkRunner

      The pipeline works correctly.

      Our deployment consist of (CDH 5.14.2, Parcels) and Spark2

      spark-submit --version
      Welcome to
      ____ __
      / _/_ ___ ____/ /_
      \ \/ _ \/ _ `/ __/ '/
      /__/ ./_,// //_\ version 2.3.0.cloudera2
      /_/

      Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_151
      Branch HEAD
      Compiled by user jenkins on 2018-04-10T23:08:17Z
      Revision 9f5baab06f127486a030024877fc13a3992f100f
      Url git://github.mtv.cloudera.com/CDH/spark.git
      Type --help for more information.

      I have attached a sample maven project which read data from kafka (localhost) and just produce an echo of the incoming data to reproduce this bug, please refer to the README for the full Stacktrace and information of how to build the sample

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jcgarciam JC
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: