Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16350

Complete output mode does not output updated aggregated value in Structured Streaming

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.0
    • Fix Version/s: 2.0.0
    • Component/s: Structured Streaming
    • Labels:
    • Environment:

      Description

      Given the following program :

      // A simple Order <-> Items model
      case class Order (
        orderid: Int,
        customer: String
      )
      
      case class Item (
        orderid: Int,
        itemid: Int,
        amount: Float
      )
      
      import spark.implicits._
      
      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      
      // input data comes from CSV files
      val INPUT_DIR: String =""
      val FILE_EXTENSION: String=".tbl"
      val ORDERS_FILE : String = INPUT_DIR + "orders" + FILE_EXTENSION
      
      // Items are added as a stream so input is a directory, not a file
      val ITEM_DIR  : String = INPUT_DIR + "item"
      
      import org.apache.spark.sql.types._
      import org.apache.spark.sql._
      
      val ItemSchema = StructType (
        StructField("orderid"        ,  IntegerType, false) ::
          StructField("itemid"      ,  IntegerType, true)   ::
          StructField("amount"     ,  FloatType, true)      ::
          Nil)
      
      val OrderSchema = StructType (
        StructField("orderid"  , IntegerType, false) ::
          StructField("customer" , StringType, true) ::
          Nil)
      
      val csvOptions = Map("sep"  -> "|")
      
      val orders = sqlContext.read.format("csv").schema(OrderSchema).options(csvOptions).load(ORDERS_FILE).as[Order]
      orders.registerTempTable("orders")
      
      val itemsStream = sqlContext.readStream.format("csv").schema(ItemSchema).options(csvOptions).csv(ITEM_DIR).as[Item]
      itemsStream.registerTempTable("itemsStream")
      
      val sum_of_items_per_customer_streamed =
        sqlContext.sql("SELECT customer, sum(amount) from orders d, itemsStream s where d.orderid = s.orderid group by customer")
      
      // print each computed value
      val outwriter = new ForeachWriter[Row] {
        def open(partitionId: Long, version: Long): Boolean = true
        def process(value: Row): Unit = if (value != null) print(value)
        def close(errorOrNull: Throwable): Unit = if (errorOrNull != null) print(errorOrNull)
      }
      
      sum_of_items_per_customer_streamed.writeStream.outputMode("complete").foreach(outwriter).start
      

      and the following data sets:

      • orders.tbl
        orderid customer
        1 foo
        2 bar
        3 foo
      • items1.tbl
        orderid itemid amount
        1 1 1.0
        1 2 1.0
        1 3 1.0
        2 1 1.0
        2 2 1.0
        3 1 1.0
      • items2.tbl
        orderid itemid amount
        1 4 1.0
        2 5 1.0

      When I do the following actions:

      • start bin/spark-shell
      • :load complete-bug.scala
      • cp items1.tbl item/
      • cp items2.tbl item/

      Then the following results are printed in console:

      [bar,2.0][foo,4.0]
      [bar,1.0][foo,1.0]
      

      I would expect the following:

      [bar,2.0][foo,4.0]
      [bar,3.0][foo,5.0]
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                proflin Liwei Lin(Inactive)
                Reporter:
                arnaud.bailly Arnaud Bailly
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: