Details
Description
The following boring code works up until when I read in the parquet file.
import numpy as np import pandas as pd import pyspark from pyspark import SQLContext, SparkContext, SparkConf print pyspark.__version__ sc = SparkContext(conf=SparkConf().setMaster('local')) df = pd.DataFrame({"mi":np.arange(100), "eid":np.arange(100)}) print df sqlc = SQLContext(sc) df = sqlc.createDataFrame(df) df = df.createOrReplaceTempView("outcomes") rdd = sqlc.sql("SELECT eid,mi FROM outcomes limit 5") print rdd.schema rdd.show() rdd.write.parquet("mi", mode="overwrite") rdd2 = sqlc.read.parquet("mi") # FAIL!
# print pyspark.__version__ 2.2.0 # print df eid mi 0 0 0 1 1 1 2 2 2 3 3 3 ... [100 rows x 2 columns] # print rdd.schema StructType(List(StructField(eid,LongType,true),StructField(mi,LongType,true))) # rdd.show() +---+---+ |eid| mi| +---+---+ | 0| 0| | 1| 1| | 2| 2| | 3| 3| | 4| 4| +---+---+
fails with:
rdd2 = sqlc.read.parquet("mixx") File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/readwriter.py", line 291, in parquet return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) File "/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.py", line 1133, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/local/lib/python2.7/dist-packages/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'
The documentation for parquet says the format is self describing, and the full schema was available when the parquet file was saved. What gives?
Works with master='local', but fails with my cluster is specified.