Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23288

Incorrect number of written records in structured streaming

    XMLWordPrintableJSON

Details

    Description

      I'm using SparkListener.onTaskEnd() to capture input and output metrics but it seems that number of written records ('taskEnd.taskMetrics().outputMetrics().recordsWritten()') is incorrect. Here is my stream construction:

       

      StreamingQuery writeStream = session
              .readStream()
              .schema(RecordSchema.fromClass(TestRecord.class))
              .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
              .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
              .csv(inputFolder.getRoot().toPath().toString())
              .as(Encoders.bean(TestRecord.class))
              .flatMap(
                  ((FlatMapFunction<TestRecord, TestVendingRecord>) (u) -> {
                      List<TestVendingRecord> resultIterable = new ArrayList<>();
                      try {
                          TestVendingRecord result = transformer.convert(u);
                          resultIterable.add(result);
                      } catch (Throwable t) {
                          System.err.println("Ooops");
                          t.printStackTrace();
                      }
      
                      return resultIterable.iterator();
                  }),
                  Encoders.bean(TestVendingRecord.class))
              .writeStream()
              .outputMode(OutputMode.Append())
              .format("parquet")
              .option("path", outputFolder.getRoot().toPath().toString())
              .option("checkpointLocation", checkpointFolder.getRoot().toPath().toString())
              .start();
      
          writeStream.processAllAvailable();
          writeStream.stop();
      

      Tested it with one good and one bad (throwing exception in transformer.convert(u)) input records and it produces following metrics:

       

      (TestMain.java:onTaskEnd(73)) - -----------status--> SUCCESS
      (TestMain.java:onTaskEnd(75)) - -----------recordsWritten--> 0
      (TestMain.java:onTaskEnd(76)) - -----------recordsRead-----> 2
      (TestMain.java:onTaskEnd(83)) - taskEnd.taskInfo().accumulables():
      (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
      (TestMain.java:onTaskEnd(85)) - value =  323
      (TestMain.java:onTaskEnd(84)) - name = number of output rows
      (TestMain.java:onTaskEnd(85)) - value =  2
      (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
      (TestMain.java:onTaskEnd(85)) - value =  364
      (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.recordsRead
      (TestMain.java:onTaskEnd(85)) - value =  2
      (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.bytesRead
      (TestMain.java:onTaskEnd(85)) - value =  157
      (TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSerializationTime
      (TestMain.java:onTaskEnd(85)) - value =  3
      (TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSize
      (TestMain.java:onTaskEnd(85)) - value =  2396
      (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorCpuTime
      (TestMain.java:onTaskEnd(85)) - value =  633807000
      (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorRunTime
      (TestMain.java:onTaskEnd(85)) - value =  683
      (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorDeserializeCpuTime
      (TestMain.java:onTaskEnd(85)) - value =  55662000
      (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorDeserializeTime
      (TestMain.java:onTaskEnd(85)) - value =  58
      (TestMain.java:onTaskEnd(89)) - input records 2
      
      Streaming query made progress: {
        "id" : "1231f9cb-b2e8-4d10-804d-73d7826c1cb5",
        "runId" : "bd23b60c-93f9-4e17-b3bc-55403edce4e7",
        "name" : null,
        "timestamp" : "2018-01-26T14:44:05.362Z",
        "numInputRows" : 2,
        "processedRowsPerSecond" : 0.8163265306122448,
        "durationMs" : {
          "addBatch" : 1994,
          "getBatch" : 126,
          "getOffset" : 52,
          "queryPlanning" : 220,
          "triggerExecution" : 2450,
          "walCommit" : 41
        },
        "stateOperators" : [ ],
        "sources" : [ {
          "description" : "FileStreamSource[file:/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3661035412295337071]",
          "startOffset" : null,
          "endOffset" : {
            "logOffset" : 0
          },
          "numInputRows" : 2,
          "processedRowsPerSecond" : 0.8163265306122448
        } ],
        "sink" : {
          "description" : "FileSink[/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3785605384928624065]"
        }
      }
      

      The number of inputs is correct but the number of output records taken from taskEnd.taskMetrics().outputMetrics().recordsWritten() is zero. Accumulables (taskEnd.taskInfo().accumulables()) don't have a correct value as well - should be 1 but it shows 2 'number of output rows'.

      Attachments

        Activity

          People

            gsomogyi Gabor Somogyi
            bondyk Yuriy Bondaruk
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: