Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20254

SPARK-19716 generates unnecessary data conversion for Dataset with primitive array

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.0
    • 2.2.0
    • SQL
    • None

    Description

      Since unresolvedmapobjects is newly introduced by SPARK-19716, the current implementation generates mapobjects() at DeserializeToObject in Analyzed Logical Plan. This mapObject() introduces Java code to store an array into GenericArrayData.
      cc: Wenchen Fan

      val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
      ds.count
      val ds2 = ds.map(e => e)
      ds2.explain(true)
      ds2.show
      

      Plans before SPARK-19716

      == Parsed Logical Plan ==
      'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
         +- 'DeserializeToObject unresolveddeserializer(upcast(getcolumnbyordinal(0, ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: "scala.Array").toDoubleArray), obj#23: [D
            +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
               +- ExternalRDD [obj#1]
      
      == Analyzed Logical Plan ==
      value: array<double>
      SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
         +- DeserializeToObject cast(value#2 as array<double>).toDoubleArray, obj#23: [D
            +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
               +- ExternalRDD [obj#1]
      
      == Optimized Logical Plan ==
      SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
         +- DeserializeToObject value#2.toDoubleArray, obj#23: [D
            +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
                     +- Scan ExternalRDDScan[obj#1]
      
      == Physical Plan ==
      *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- *MapElements <function1>, obj#24: [D
         +- *DeserializeToObject value#2.toDoubleArray, obj#23: [D
            +- *InMemoryTableScan [value#2]
                  +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
                           +- Scan ExternalRDDScan[obj#1]
      
      

      Plans after SPARK-19716

      == Parsed Logical Plan ==
      'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
         +- 'DeserializeToObject unresolveddeserializer(unresolvedmapobjects(<function1>, getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D
            +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
               +- ExternalRDD [obj#1]
      
      == Analyzed Logical Plan ==
      value: array<double>
      SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
         +- DeserializeToObject mapobjects(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: [D
            +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
               +- ExternalRDD [obj#1]
      
      == Optimized Logical Plan ==
      SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
         +- DeserializeToObject mapobjects(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: [D
            +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
                     +- Scan ExternalRDDScan[obj#1]
      
      == Physical Plan ==
      *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- *MapElements <function1>, obj#24: [D
         +- *DeserializeToObject mapobjects(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: [D
            +- InMemoryTableScan [value#2]
                  +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
                           +- Scan ExternalRDDScan[obj#1]
      {{java}}
      
      {{java}}
      ...
      /* 056 */       ArrayData deserializetoobject_value1 = null;
      /* 057 */
      /* 058 */       if (!inputadapter_isNull) {
      /* 059 */         int deserializetoobject_dataLength = inputadapter_value.numElements();
      /* 060 */
      /* 061 */         Double[] deserializetoobject_convertedArray = null;
      /* 062 */         deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength];
      /* 063 */
      /* 064 */         int deserializetoobject_loopIndex = 0;
      /* 065 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
      /* 066 */           MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex));
      /* 067 */           MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
      /* 068 */
      /* 069 */           if (MapObjects_loopIsNull2) {
      /* 070 */             throw new RuntimeException(((java.lang.String) references[0]));
      /* 071 */           }
      /* 072 */           if (false) {
      /* 073 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
      /* 074 */           } else {
      /* 075 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2;
      /* 076 */           }
      /* 077 */
      /* 078 */           deserializetoobject_loopIndex += 1;
      /* 079 */         }
      /* 080 */
      /* 081 */         deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/
      /* 082 */       }
      ...
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kiszk Kazuaki Ishizaki
            kiszk Kazuaki Ishizaki
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment