Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
3.0.1
-
None
Description
I am getting zeros in the output of this test on IBM Z. This is a big-endian system. See error below.
I think this issue is related to the use of IntegerType in the schema for FakeDefaultSource. Modifying the schema to use LongType fixes the issue. Another workaround is to remove .select("a") (see patch below).
My working theory is that long data (longs are generated by Range) is being read using unsafe int operations (as specified in the schema). This would 'work' on little-endian systems but not big-endian systems. I'm still working to figure out what the mechanism is and I'd appreciate any hints or insights.
The error looks like this:
- SPARK-20432: union one stream with itself *** FAILED *** Decoded objects do not match expected objects: expected: WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) actual: WrappedArray(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) assertnotnull(upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long")) +- upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long") +- getcolumnbyordinal(0, LongType) (QueryTest.scala:88)
This change fixes the issue:
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.streaming.util.{BlockOnStopSourceProvider, StreamManualClock} -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} import org.apache.spark.util.Utils class StreamSuite extends StreamTest { @@ -1265,7 +1265,7 @@ class StreamSuite extends StreamTest { } abstract class FakeSource extends StreamSourceProvider { - private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) + private val fakeSchema = StructType(StructField("a", LongType) :: Nil) override def sourceSchema( spark: SQLContext, @@ -1287,7 +1287,7 @@ class FakeDefaultSource extends FakeSource { new Source { private var offset = -1L - override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil) + override def schema: StructType = StructType(StructField("a", LongType) :: Nil) override def getOffset: Option[Offset] = { if (offset >= 10) {
Alternatively, this change also fixes the issue:
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -154,7 +154,7 @@ class StreamSuite extends StreamTest { } test("SPARK-20432: union one stream with itself") { - val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a") + val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() val unioned = df.union(df) withTempDir { outputDir => withTempDir { checkpointDir =>