-
Type:
Bug
-
Status: Resolved
-
Priority:
Major
-
Resolution: Fixed
-
Affects Version/s: None
-
Fix Version/s: 1.8.0
-
Component/s: None
-
Labels:None
Hi!
I'm getting null pointer exception when I'm trying to write parquet files with spark.
Dec 13, 2014 3:05:10 AM WARNING: parquet.hadoop.ParquetOutputCommitter: could not write summary file for hdfs://phoenix-011.nym1.placeiq.net:8020/user/vkuzemchik/parquet_data/1789 java.lang.NullPointerException at parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:426) at parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:402) at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:51) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:936) at com.placeiq.spark.KafkaReader$.writeParquetHadoop(KafkaReader.scala:143) at com.placeiq.spark.KafkaReader$$anonfun$3.apply(KafkaReader.scala:165) at com.placeiq.spark.KafkaReader$$anonfun$3.apply(KafkaReader.scala:164) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Here is function I'm using:
Spark.scala
def writeParquetHadoop(rdd:RDD[(Void,LogMessage)]):Unit = { val jobConf = new JobConf(ssc.sparkContext.hadoopConfiguration) val job = new Job(jobConf) val outputDir = "hdfs://phoenix-011.nym1.placeiq.net:8020/user/vkuzemchik/parquet_data/" ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[LogMessage]]) AvroParquetInputFormat.setAvroReadSchema(job, LogMessage.SCHEMA$) AvroParquetOutputFormat.setSchema(job, LogMessage.SCHEMA$) ParquetOutputFormat.setCompression(job,CompressionCodecName.SNAPPY) ParquetOutputFormat.setBlockSize(job, 536870912) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[LogMessage]) job.setOutputFormatClass(classOf[ParquetOutputFormat[LogMessage]]) job.getConfiguration.set("mapred.output.dir", outputDir+rdd.id) rdd.saveAsNewAPIHadoopDataset(job.getConfiguration) }
I have this issue on 1.5. Trying to re-produce on newer versions.