Details
-
Improvement
-
Status: Resolved
-
Critical
-
Resolution: Duplicate
-
2.3.0
-
None
-
None
Description
Exception thrown:
scala> 18/06/01 22:47:04 ERROR MicroBatchExecution: Query [id = 0bdc4428-5d21-4237-9d64-898ae65f28f3, runId = f6822423-2bd2-47c1-8ed6-799d1c481195] terminated with error java.lang.RuntimeException: Offsets committed out of order: 2 followed by -1 at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:197) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$2$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:377)
Sample code that reproduces the error on restarting the query.
import java.sql.Timestamp import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import spark.implicits._ import org.apache.spark.sql.streaming.Trigger val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).option("includeTimestamp", true).load() val words = lines.as[(String, Timestamp)].flatMap(line => line._1.split(" ").map(word => (word, line._2))).toDF("word", "timestamp") val windowedCounts = words.groupBy(window($"timestamp", "20 minutes", "20 minutes"), $"word").count().orderBy("window") val query = windowedCounts.writeStream.outputMode("complete").option("checkpointLocation", "/tmp/debug").format("console").option("truncate", "false").start()
Attachments
Issue Links
- duplicates
-
SPARK-23844 Socket Stream recovering from checkpoint will throw exception
- In Progress
- links to