Description
BinaryType used by Spark SQL defines ordering using signed byte comparisons. This can lead to unexpected behavior. Consider the following code snippet that shows this error:
case class TestRecord(col0: Array[Byte]) def convertToBytes(i: Long): Array[Byte] = { val bb = java.nio.ByteBuffer.allocate(8) bb.putLong(i) bb.array } def test = { val sql = spark.sqlContext import sql.implicits._ val timestamp = 1498772083037L val data = (timestamp to timestamp + 1000L).map(i => TestRecord(convertToBytes(i))) val testDF = sc.parallelize(data).toDF val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && col("col0") < convertToBytes(timestamp + 50L)) val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + 50L) && col("col0") < convertToBytes(timestamp + 100L)) val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && col("col0") < convertToBytes(timestamp + 100L)) assert(filter1.count == 50) assert(filter2.count == 50) assert(filter3.count == 100) }
Attachments
Issue Links
- links to