Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-8450

PySpark write.parquet raises Unsupported datatype DecimalType()

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.5.0
    • PySpark, SQL
    • None
    • Spark 1.4.0 on Debian

    Description

      I'm getting an Exception when I try to save a DataFrame with a DeciamlType as an parquet file

      Minimal Example:

      from decimal import Decimal
      from pyspark.sql import SQLContext
      from pyspark.sql.types import *
      
      sqlContext = SQLContext(sc)
      schema = StructType([
          StructField('id', LongType()),
          StructField('value', DecimalType())])
      rdd = sc.parallelize([[1, Decimal("0.5")],[2, Decimal("2.9")]])
      df = sqlContext.createDataFrame(rdd, schema)
      df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')
      
      

      Stack Trace

      ---------------------------------------------------------------------------
      Py4JJavaError                             Traceback (most recent call last)
      <ipython-input-19-a77dac8de5f3> in <module>()
      ----> 1 sr.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')
      
      /home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in parquet(self, path, mode)
          367         :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
          368         """
      --> 369         return self._jwrite.mode(mode).parquet(path)
          370 
          371     @since(1.4)
      
      /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
          536         answer = self.gateway_client.send_command(command)
          537         return_value = get_return_value(answer, self.gateway_client,
      --> 538                 self.target_id, self.name)
          539 
          540         for temp_arg in temp_args:
      
      /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
          298                 raise Py4JJavaError(
          299                     'An error occurred while calling {0}{1}{2}.\n'.
      --> 300                     format(target_id, '.', name), value)
          301             else:
          302                 raise Py4JError(
      
      Py4JJavaError: An error occurred while calling o361.parquet.
      : org.apache.spark.SparkException: Job aborted.
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138)
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114)
      	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
      	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
      	at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
      	at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
      	at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
      	at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
      	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
      	at py4j.Gateway.invoke(Gateway.java:259)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:207)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 158 in stage 35.0 failed 4 times, most recent failure: Lost task 158.3 in stage 35.0 (TID 2736, 10.2.160.14): java.lang.RuntimeException: Unsupported datatype DecimalType()
      	at scala.sys.package$.error(package.scala:27)
      	at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:374)
      	at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:318)
      	at scala.Option.getOrElse(Option.scala:120)
      	at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:317)
      	at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:398)
      	at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:397)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
      	at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:396)
      	at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:150)
      	at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
      	at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
      	at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)
      	at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
      	at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386)
      	at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298)
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142)
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
      	at org.apache.spark.scheduler.Task.run(Task.scala:70)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      
      Driver stacktrace:
      	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
      	at scala.Option.foreach(Option.scala:236)
      	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
      	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      

      I also tried to set the precision < 18

      schema = StructType([
          StructField('id', LongType()),
          StructField('value', DecimalType(16,2))])
      

      which raises a different exception

      ---------------------------------------------------------------------------
      Py4JJavaError                             Traceback (most recent call last)
      <ipython-input-23-bba70b7c0805> in <module>()
      ----> 1 df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')
      
      /home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in parquet(self, path, mode)
          367         :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
          368         """
      --> 369         return self._jwrite.mode(mode).parquet(path)
          370 
          371     @since(1.4)
      
      /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
          536         answer = self.gateway_client.send_command(command)
          537         return_value = get_return_value(answer, self.gateway_client,
      --> 538                 self.target_id, self.name)
          539 
          540         for temp_arg in temp_args:
      
      /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
          298                 raise Py4JJavaError(
          299                     'An error occurred while calling {0}{1}{2}.\n'.
      --> 300                     format(target_id, '.', name), value)
          301             else:
          302                 raise Py4JError(
      
      Py4JJavaError: An error occurred while calling o417.parquet.
      : org.apache.spark.SparkException: Job aborted.
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138)
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114)
      	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
      	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
      	at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
      	at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
      	at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
      	at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
      	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
      	at py4j.Gateway.invoke(Gateway.java:259)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:207)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 159 in stage 41.0 failed 4 times, most recent failure: Lost task 159.3 in stage 41.0 (TID 3211, 10.2.160.14): org.apache.spark.SparkException: Task failed while writing rows.
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
      	at org.apache.spark.scheduler.Task.run(Task.scala:70)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.types.Decimal
      	at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:365)
      	at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:335)
      	at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:321)
      	at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
      	at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
      	at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
      	at org.apache.spark.sql.parquet.ParquetOutputWriter.write(newParquet.scala:114)
      	at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:154)
      	... 8 more
      
      Driver stacktrace:
      	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
      	at scala.Option.foreach(Option.scala:236)
      	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
      	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      
      

      The corresponding Scala Version works

      import org.apache.spark.SparkContext
      import org.apache.spark.sql.{ Row, SQLContext }
      import org.apache.spark.sql.types.{ DecimalType, IntegerType, StructType, StructField }
       
      object ParquetDecimal {
        def main(args: Array[String]) {
          // Connect to Spark
          val sc = new SparkContext()
          val sqlContext = new SQLContext(sc)
       
          val schema = StructType(Seq(StructField("id", IntegerType), StructField("value", DecimalType(16, 2))))
          val rows = sc.parallelize(Seq(Row(1, BigDecimal("0.9")), Row(2, BigDecimal("2.9"))))
          val df = sqlContext.createDataFrame(rows, schema)
          df.write.parquet("test.parquet")
        }
      }
      

      Attachments

        Activity

          People

            davies Davies Liu
            hoffmann Juergen Hoffmann
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: