Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Invalid
-
2.4.0
-
None
-
None
Description
I tried to setup a simple streaming pipeline with two streams on two data sources (CSV files) where one stream is fist windowed (aggregated) and then the streams are joined. As output, I chose console for development in Append Mode.
After multiple hours of setup and testing, I still wasn't able to get a working example running. I also found a StackOverflow topic here https://stackoverflow.com/questions/52300247/spark-scala-structured-streaming-aggregation-and-self-join where they have the same findings as I had: "In general, append output mode with aggregations is not a recommended way. As far as I understand". And "I had the same empty output in my case with different aggregation. Once I changed it to update mode I got output. I have strong doubts that you will be able to do it in append mode. My understanding is that append mode only for map-like operations, e.g. filter, transform entry etc. I believe that multiphase processing".
I observed the same and only got empty output in my example:
public static SparkSession buildSession() throws Exception { return SparkSession.builder() .appName("StreamGroupJoin") .config("spark.sql.shuffle.partitions", 4) .master("local[2]") .getOrCreate(); } public static Dataset<Row> loadData(SparkSession session, String filepath) { return session .readStream() .format("csv") .option("header", true) .option("path", filepath) .schema(new StructType().add("ts", DataTypes.TimestampType).add("color", DataTypes.StringType).add("data", DataTypes.StringType)) .load(); } public static void main(String[] args) throws Exception { SparkSession session = buildSession(); Dataset<Row> shieldStream = loadData(session, "streamingpoc/src/main/resources/simpleSHIELD"); Dataset<Row> argusStream = loadData(session, "streamingpoc/src/main/resources/simpleARGUS"); shieldStream = shieldStream.withWatermark("ts", "0 hours"); argusStream = argusStream.withWatermark("ts", "0 hours"); argusStream = argusStream.groupBy(window(col("ts"), "24 hours"), col("color")).count(); argusStream = argusStream.select(col("window.start").as("argusStart"), col("window.end").as("argusEnd"), col("color").as("argusColor"), col("count").as("argusCount")); //argusStream = argusStream.withWatermark("argusStart", "0 hours"); Dataset<Row> joinedStream = argusStream.join(shieldStream, expr("color = argusColor AND ts >= argusStart AND ts <= argusEnd")); joinedStream = joinedStream.withWatermark("ts", "0 hours"); StreamingQuery joinedQuery = joinedStream.writeStream() .outputMode(OutputMode.Append()) .format("console") .start(); joinedQuery.awaitTermination(); System.out.println("DONE"); }
I'd like to address that at least in my testing version of Spark 2.4.0, it is not even possible to switch to OutputMode.Update due to "Inner join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode"
In my example, I used two simple CSV datasets having the same format and one matching row which should be output after the JOIN.
If I work without JOIN, both streams (aggregated and not) work fine. If I work without aggregation, JOIN works fine. But if I use both (at least in append mode), it doesn't work out. If I don't use Spark Structured Streaming but standard Spark Dataframes, I get the result I also planned to have.
Possible Solutions
- Either there is a bug/missusage in my code. In that case, the ticket can be closed and I'd be happy if someone could tell me what I did wrong. I tried quite a lot with different Watermark settings but wasn't able to find a working setup.
- Perform a fix for OuputMode Append if technically possible (From my theoretical understanding of Big-Data-Streaming in general, this should be possible, but I'm not too much into the topic and I'm not familar with Spark afterall)
- Make this option unavailable in spark (i.e. print out a pipeline error that an aggregated stream can't be joined in append mode as is already done if I try to join two aggregated streams). In that case, the documentation should also be updated and stated out that for anyone willing to perform aggregations and JOINs, he is advised to put the aggregation output back into a sink like kafka and reread from there for the join.
Following is the result I'd like to obtain (And which I get if I use Spark Datasets instead of Spark Structured Streaming)
+-------------------+-------------------+----------+----------+-------------------+-----+--------+ |argusStart |argusEnd |argusColor|argusCount|ts |color|data | +-------------------+-------------------+----------+----------+-------------------+-----+--------+ |2018-07-20 02:00:00|2018-07-21 02:00:00|red |1 |2018-07-20 12:01:15|red |Iron Man| +-------------------+-------------------+----------+----------+-------------------+-----+--------+
And the dummy CSV files I created
example-argus.csv
ts,color,data 2018-07-19T08:33:07Z,green,Green Lantern 2018-07-20T00:00:00Z,red,Aquaman 2018-07-20T07:00:00Z,green,Batman 2018-07-20T10:00:00Z,green,Flash 2018-07-21T10:01:13Z,green,Green Arrow 2018-07-22T10:01:15Z,green,Robin 2018-07-23T10:03:15Z,green,Starfire 2018-07-26T10:07:23Z,green,Supergirl 2018-07-26T10:13:23Z,red,Superman 2018-07-26T11:01:01Z,green,Wonder Woman 2018-07-26T14:02:11Z,green,Cyborg 2018-07-28T14:05:53Z,green,Harley Quinn 2018-07-30T05:00:13Z,green,Deadshot 2018-08-10T09:23:32Z,green,El Diablo 2018-08-10T12:12:12Z,green,Killer Croc
example-shield.csv
ts,color,data 2018-07-19T10:01:13Z,blue,Captain America 2018-07-20T10:01:15Z,red,Iron Man 2018-07-20T10:03:15Z,blue,Thor 2018-07-20T10:07:23Z,blue,Hulk 2018-07-20T10:13:23Z,blue,Black Widow 2018-07-20T11:01:01Z,blue,Hawkeye 2018-07-20T14:02:11Z,blue,Loki 2018-07-20T14:05:53Z,blue,Spider-Man 2018-07-21T05:00:13Z,blue,Vision 2018-07-21T09:23:32Z,blue,Scarlet Witch 2018-07-21T12:12:12Z,blue,Dr. Strange 2018-07-21T13:13:13Z,blue,Star-Lord 2018-07-22T01:52:18Z,red,Drax 2018-07-26T01:52:18Z,blue,Groot 2018-08-10T12:12:12Z,blue,Rocket
Attachments
Issue Links
- relates to
-
SPARK-15428 Disable support for multiple streaming aggregations
- Resolved