Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25649

CatalystTypeConverter throws exception for ScalaesRow type when converting from ArrayConverter

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Invalid
    • Affects Version/s: 2.3.2
    • Fix Version/s: None
    • Component/s: Examples
    • Labels:
      None
    • Environment:

      Scala version: 2.2.1
      Elasticsearch-hadoop version: 6.4.1
      Java version: 1.8.0_181

      Description

      Hi,

      I am trying running pyspark to convert elastic search data using `elasticsearch-hadoop:6.4.2`.

      Here, I have a few array fields which looks like:
      "outer_array": [

      { "inner_array": ["1", "2", "3"], "other_value": 4 }

      ,

      { "inner_array": ["5", "6", "7"], "other_value": 9 }

      ]

      Outer array is an array of objects which has a property which is an array. Basically the issue is that it is not able to convert ScalaEsRow type to ArrayConverter, which is shown in the stack trace below.

      For the conversion part, I am providing the following option:

      df2=spark.read.option("es.nodes", "localhost").option("es.port","9200").option("es.read.field.as.array.include","outer_array,'outer_array.inner_array', 'outer_array.other_value'").format("org.elasticsearch.spark.sql").load("temp-index/customer")
      df2.show(1)

      Here it's showing the following error:

      scala.MatchError: [Buffer(1, 2, 3),4] (of class org.elasticsearch.spark.sql.ScalaEsRow)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:160)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:164)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
      at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
      at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:61)
      at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:58)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
      at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              mauliksoneji Maulik Soneji
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: