Details
Description
Pipe action convert objects into strings using a way that was affected by the default encoding setting of Python environment.
Here is the related code fragment (L717-721@python/pyspark/rdd.py):
def pipe_objs(out): for obj in iterator: s = str(obj).rstrip('\n') + '\n' out.write(s.encode('utf-8')) out.close()
The `str(obj)` part implicitly convert `obj` to an unicode string, then encode it into a byte string using default encoding; On the other hand, the `s.encode('utf-8')` part implicitly decode `s` into an unicode string using default encoding and then encode it (AGAIN!) into a UTF-8 encoded byte string.
Typically the default encoding of Python environment would be 'ascii', which means passing an unicode string containing characters beyond 'ascii' charset will raise UnicodeEncodeError exception at `str(obj)` and passing a byte string containing bytes greater than 128 will again raise UnicodeEncodeError exception at 's.encode('utf-8')`.
Changing `str(obj)` to `unicode(obj)` would eliminate these problems.
The following code snippet reproduces these errors:
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.6.3 /_/ Using Python version 2.7.12 (default, Jul 25 2016 15:06:45) SparkContext available as sc, HiveContext available as sqlContext. >>> sc.parallelize([u'\u6d4b\u8bd5']).pipe('cat').collect() [Stage 0:> (0 + 4) / 4]Exception in thread Thread-1: Traceback (most recent call last): File "/usr/local/Cellar/python/2.7.12/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/local/Cellar/python/2.7.12/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/Users/wxz/Downloads/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 719, in pipe_objs s = str(obj).rstrip('\n') + '\n' UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128) >>> >>> sc.parallelize([u'\u6d4b\u8bd5']).map(lambda x: x.encode('utf-8')).pipe('cat').collect() Exception in thread Thread-1: Traceback (most recent call last): File "/usr/local/Cellar/python/2.7.12/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/local/Cellar/python/2.7.12/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/Users/wxz/Downloads/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 720, in pipe_objs out.write(s.encode('utf-8')) UnicodeDecodeError: 'ascii' codec can't decode byte 0xe6 in position 0: ordinal not in range(128)