Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-36917

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

    XMLWordPrintableJSON

Details

    Description

      My spark Job fails with this error:

      org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (davben-lubuntu executor 2): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

      My OS Linux Ubuntu 20 is in this way organized: I have two user: /home/davben and /home/hadoop. Into hadoop user I have installed hadoop 3.1 and spark-3.1.2-hadoop3.2.  Both users refers to java-8-openjdk Java installation. The Spark job is launched from user davben on eclipse IDE  in this way:
      I create the spark conf and the spark session

       

      System.setProperty("hadoop.home.dir", "/home/hadoop/hadoop");
      SparkConf sparkConf = new SparkConf()
      .setAppName("simple")
      .setMaster("yarn")
      .set("spark.executor.memory", "1g")
      .set("deploy.mode", "cluster")
      .set("spark.yarn.stagingDir", "hdfs://localhost:9000/user/hadoop/") .set("spark.hadoop.fs.defaultFS","hdfs://localhost:9000") .set("spark.hadoop.yarn.resourcemanager.hostname","localhost") .set("spark.hadoop.yarn.resourcemanager.scheduler.address","localhost:8030") .set("spark.hadoop.yarn.resourcemanager.address ","localhost:8032") .set("spark.hadoop.yarn.resourcemanager.webapp.address","localhost:8088") .set("spark.hadoop.yarn.resourcemanager.admin.address","localhost:8083")
      SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();

      Then I create a dataset with two entries:

       

      List<Row> rows = new ArrayList<>(); 
      rows.add(RowFactory.create("a", "b"));
      rows.add(RowFactory.create("a", "a"));
      StructType structType = new StructType(); 
      structType = structType.add("edge_1", DataTypes.StringType, false);
      structType = structType.add("edge_2", DataTypes.StringType, false); ExpressionEncoder<Row> edgeEncoder = RowEncoder.apply(structType);
      Dataset<Row> edge = spark.createDataset(rows, edgeEncoder);
      

      Then I print the content of the current dataset edge

       edge.show();
      

       

      Then I perform a map transformation on edge that upper cases the values of the two entries and return the result in edge2

       Dataset<Row> edge2 = edge.map(new MyFunction2(), edgeEncoder);

      The following is the code of MyFunction2

      public class MyFunction2 implements MapFunction<Row, Row>, scala.Serializable { 
      private static final long serialVersionUID = 1L;
      
      @Override public Row call(Row v1) throws Exception { 
      String el1 = v1.get(0).toString().toUpperCase(); 
      String el2 = v1.get(1).toString().toUpperCase(); 
      return RowFactory.create(el1,el2); 
      }
      }

      Finally I show the content of edge2

      edge2.show();
      

      I can confirm that, checking on the hadoop UI a localhost:8088, the job is submitted correctly, and
      what sounds strange is that the first show is returned correctly in my console, but the second one fails returning the up mentioned error.

       

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            davvy93 Davide Benedetto
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: