Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26692

Structured Streaming: Aggregation + JOIN not working

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 2.4.0
    • None
    • Structured Streaming
    • None

    Description

      I tried to setup a simple streaming pipeline with two streams on two data sources (CSV files) where one stream is fist windowed (aggregated) and then the streams are joined. As output, I chose console for development in Append Mode.

      After multiple hours of setup and testing, I still wasn't able to get a working example running. I also found a StackOverflow topic here https://stackoverflow.com/questions/52300247/spark-scala-structured-streaming-aggregation-and-self-join where they have the same findings as I had: "In general, append output mode with aggregations is not a recommended way. As far as I understand". And "I had the same empty output in my case with different aggregation. Once I changed it to update mode I got output. I have strong doubts that you will be able to do it in append mode. My understanding is that append mode only for map-like operations, e.g. filter, transform entry etc. I believe that multiphase processing".

      I observed the same and only got empty output in my example:

      public static SparkSession buildSession() throws Exception {
          return SparkSession.builder()
                  .appName("StreamGroupJoin")
                  .config("spark.sql.shuffle.partitions", 4)
                  .master("local[2]")
                  .getOrCreate();
      }
      
      public static Dataset<Row> loadData(SparkSession session, String filepath) {
          return session
                  .readStream()
                  .format("csv")
                  .option("header", true)
                  .option("path", filepath)
                  .schema(new StructType().add("ts", DataTypes.TimestampType).add("color", DataTypes.StringType).add("data", DataTypes.StringType))
                  .load();
      }
      
      public static void main(String[] args) throws Exception {
      
          SparkSession session = buildSession();
      
          Dataset<Row> shieldStream = loadData(session, "streamingpoc/src/main/resources/simpleSHIELD");
          Dataset<Row> argusStream = loadData(session, "streamingpoc/src/main/resources/simpleARGUS");
      
          shieldStream = shieldStream.withWatermark("ts", "0 hours");
      
          argusStream = argusStream.withWatermark("ts", "0 hours");
          argusStream = argusStream.groupBy(window(col("ts"), "24 hours"), col("color")).count();
          argusStream = argusStream.select(col("window.start").as("argusStart"), col("window.end").as("argusEnd"), col("color").as("argusColor"), col("count").as("argusCount"));
          //argusStream = argusStream.withWatermark("argusStart", "0 hours");
      
          Dataset<Row> joinedStream = argusStream.join(shieldStream, expr("color = argusColor AND ts >= argusStart AND ts <= argusEnd"));
          joinedStream = joinedStream.withWatermark("ts", "0 hours");
      
          StreamingQuery joinedQuery = joinedStream.writeStream()
                  .outputMode(OutputMode.Append())
                  .format("console")
                  .start();
      
          joinedQuery.awaitTermination();
      
          System.out.println("DONE");
      }

      I'd like to address that at least in my testing version of Spark 2.4.0, it is not even possible to switch to OutputMode.Update due to "Inner join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode"

      In my example, I used two simple CSV datasets having the same format and one matching row which should be output after the JOIN.

      If I work without JOIN, both streams (aggregated and not) work fine. If I work without aggregation, JOIN works fine. But if I use both (at least in append mode), it doesn't work out. If I don't use Spark Structured Streaming but standard Spark Dataframes, I get the result I also planned to have.

      Possible Solutions

      1.  Either there is a bug/missusage in my code. In that case, the ticket can be closed and I'd be happy if someone could tell me what I did wrong. I tried quite a lot with different Watermark settings but wasn't able to find a working setup.
      2. Perform a fix for OuputMode Append if technically possible (From my theoretical understanding of Big-Data-Streaming in general, this should be possible, but I'm not too much into the topic and I'm not familar with Spark afterall)
      3. Make this option unavailable in spark (i.e. print out a pipeline error that an aggregated stream can't be joined in append mode as is already done if I try to join two aggregated streams). In that case, the documentation should also be updated and stated out that for anyone willing to perform aggregations and JOINs, he is advised to put the aggregation output back into a sink like kafka and reread from there for the join.

       

      Following is the result I'd like to obtain (And which I get if I use Spark Datasets instead of Spark Structured Streaming)

      +-------------------+-------------------+----------+----------+-------------------+-----+--------+
      |argusStart         |argusEnd           |argusColor|argusCount|ts                 |color|data    |
      +-------------------+-------------------+----------+----------+-------------------+-----+--------+
      |2018-07-20 02:00:00|2018-07-21 02:00:00|red       |1         |2018-07-20 12:01:15|red  |Iron Man|
      +-------------------+-------------------+----------+----------+-------------------+-----+--------+

      And the dummy CSV files I created

      example-argus.csv

      ts,color,data
      2018-07-19T08:33:07Z,green,Green Lantern
      2018-07-20T00:00:00Z,red,Aquaman
      2018-07-20T07:00:00Z,green,Batman
      2018-07-20T10:00:00Z,green,Flash
      2018-07-21T10:01:13Z,green,Green Arrow
      2018-07-22T10:01:15Z,green,Robin
      2018-07-23T10:03:15Z,green,Starfire
      2018-07-26T10:07:23Z,green,Supergirl
      2018-07-26T10:13:23Z,red,Superman
      2018-07-26T11:01:01Z,green,Wonder Woman
      2018-07-26T14:02:11Z,green,Cyborg
      2018-07-28T14:05:53Z,green,Harley Quinn
      2018-07-30T05:00:13Z,green,Deadshot
      2018-08-10T09:23:32Z,green,El Diablo
      2018-08-10T12:12:12Z,green,Killer Croc

      example-shield.csv

      ts,color,data
      2018-07-19T10:01:13Z,blue,Captain America
      2018-07-20T10:01:15Z,red,Iron Man
      2018-07-20T10:03:15Z,blue,Thor
      2018-07-20T10:07:23Z,blue,Hulk
      2018-07-20T10:13:23Z,blue,Black Widow
      2018-07-20T11:01:01Z,blue,Hawkeye
      2018-07-20T14:02:11Z,blue,Loki
      2018-07-20T14:05:53Z,blue,Spider-Man
      2018-07-21T05:00:13Z,blue,Vision
      2018-07-21T09:23:32Z,blue,Scarlet Witch
      2018-07-21T12:12:12Z,blue,Dr. Strange
      2018-07-21T13:13:13Z,blue,Star-Lord
      2018-07-22T01:52:18Z,red,Drax
      2018-07-26T01:52:18Z,blue,Groot
      2018-08-10T12:12:12Z,blue,Rocket

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              TheoD Theo Diefenthal
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: