Details
Description
I'm using SparkListener.onTaskEnd() to capture input and output metrics but it seems that number of written records ('taskEnd.taskMetrics().outputMetrics().recordsWritten()') is incorrect. Here is my stream construction:
StreamingQuery writeStream = session .readStream() .schema(RecordSchema.fromClass(TestRecord.class)) .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) .csv(inputFolder.getRoot().toPath().toString()) .as(Encoders.bean(TestRecord.class)) .flatMap( ((FlatMapFunction<TestRecord, TestVendingRecord>) (u) -> { List<TestVendingRecord> resultIterable = new ArrayList<>(); try { TestVendingRecord result = transformer.convert(u); resultIterable.add(result); } catch (Throwable t) { System.err.println("Ooops"); t.printStackTrace(); } return resultIterable.iterator(); }), Encoders.bean(TestVendingRecord.class)) .writeStream() .outputMode(OutputMode.Append()) .format("parquet") .option("path", outputFolder.getRoot().toPath().toString()) .option("checkpointLocation", checkpointFolder.getRoot().toPath().toString()) .start(); writeStream.processAllAvailable(); writeStream.stop();
Tested it with one good and one bad (throwing exception in transformer.convert(u)) input records and it produces following metrics:
(TestMain.java:onTaskEnd(73)) - -----------status--> SUCCESS (TestMain.java:onTaskEnd(75)) - -----------recordsWritten--> 0 (TestMain.java:onTaskEnd(76)) - -----------recordsRead-----> 2 (TestMain.java:onTaskEnd(83)) - taskEnd.taskInfo().accumulables(): (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max) (TestMain.java:onTaskEnd(85)) - value = 323 (TestMain.java:onTaskEnd(84)) - name = number of output rows (TestMain.java:onTaskEnd(85)) - value = 2 (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max) (TestMain.java:onTaskEnd(85)) - value = 364 (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.recordsRead (TestMain.java:onTaskEnd(85)) - value = 2 (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.bytesRead (TestMain.java:onTaskEnd(85)) - value = 157 (TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSerializationTime (TestMain.java:onTaskEnd(85)) - value = 3 (TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSize (TestMain.java:onTaskEnd(85)) - value = 2396 (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorCpuTime (TestMain.java:onTaskEnd(85)) - value = 633807000 (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorRunTime (TestMain.java:onTaskEnd(85)) - value = 683 (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorDeserializeCpuTime (TestMain.java:onTaskEnd(85)) - value = 55662000 (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorDeserializeTime (TestMain.java:onTaskEnd(85)) - value = 58 (TestMain.java:onTaskEnd(89)) - input records 2 Streaming query made progress: { "id" : "1231f9cb-b2e8-4d10-804d-73d7826c1cb5", "runId" : "bd23b60c-93f9-4e17-b3bc-55403edce4e7", "name" : null, "timestamp" : "2018-01-26T14:44:05.362Z", "numInputRows" : 2, "processedRowsPerSecond" : 0.8163265306122448, "durationMs" : { "addBatch" : 1994, "getBatch" : 126, "getOffset" : 52, "queryPlanning" : 220, "triggerExecution" : 2450, "walCommit" : 41 }, "stateOperators" : [ ], "sources" : [ { "description" : "FileStreamSource[file:/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3661035412295337071]", "startOffset" : null, "endOffset" : { "logOffset" : 0 }, "numInputRows" : 2, "processedRowsPerSecond" : 0.8163265306122448 } ], "sink" : { "description" : "FileSink[/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3785605384928624065]" } }
The number of inputs is correct but the number of output records taken from taskEnd.taskMetrics().outputMetrics().recordsWritten() is zero. Accumulables (taskEnd.taskInfo().accumulables()) don't have a correct value as well - should be 1 but it shows 2 'number of output rows'.