Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32159

New udaf(Aggregator) has an integration bug with UnresolvedMapObjects serialization



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.1, 3.1.0
    • SQL
    • None


      The new user defined aggregator feature (SPARK-27296) based on calling 'functions.udaf(aggregator)' works fine when the aggregator input type is atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, like 'Aggregator[Array[Double], _, _]',  it is tripping over the following:


      • When constructing [[MapObjects]], the element type must be given, which may not be available
      • before analysis. This class acts like a placeholder for [[MapObjects]], and will be replaced by
      • [[MapObjects]] during analysis after the input data is resolved.
      • Note that, ideally we should not serialize and send unresolved expressions to executors, but
      • users may accidentally do this(e.g. mistakenly reference an encoder instance when implementing
      • Aggregator). Here we mark `function` as transient because it may reference scala Type, which is
      • not serializable. Then even users mistakenly reference unresolved expression and serialize it,
      • it's just a performance issue(more network traffic), and will not fail.
        case class UnresolvedMapObjects(
        @transient function: Expression => Expression,
        child: Expression,
        customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with Unevaluable {
        override lazy val resolved = false

      override def dataType: DataType = customCollectionCls.map(ObjectType.apply).getOrElse

      { throw new UnsupportedOperationException("not resolved") }



      The '@transient' is causing the function to be unpacked as 'null' over on the executors, and it is causing a null-pointer exception here, when it tries to do 'function(loopVar)'

      object MapObjects {
      def apply(
      function: Expression => Expression,
      inputData: Expression,
      elementType: DataType,
      elementNullable: Boolean = true,
      customCollectionCls: Option[Class[_]] = None): MapObjects =

      { val loopVar = LambdaVariable("MapObject", elementType, elementNullable) MapObjects(loopVar, function(loopVar), inputData, customCollectionCls) }


      I believe it may be possible to just use 'loopVar' instead of 'function(loopVar)', whenever 'function' is null, but need second opinion from catalyst developers on what a robust fix should be




            eje Erik Erlandson
            eje Erik Erlandson
            0 Vote for this issue
            5 Start watching this issue

