Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11341

Pipeline using GenerateSequence not working with SDF

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.25.0
    • None
    • runner-flink
    • None

    Description

      Originally reported by Tao Li on the mailing list: https://lists.apache.org/thread.html/r20ec55733a0b54018e23e43fe2ca3359b5adf7ad1f98c5ff7a35254a%40%3Cuser.beam.apache.org%3E


      I am running into a problem with “org.apache.beam:beam-runners-flink-1.11:2.25.0” and “org.apache.beam:beam-runners-flink-1.10:2.25.0”. I am doing some local testing with the flink runners in embedded mode. The problem is that I cannot save data into local files using those artifact versions. However when I switched to “org.apache.beam:beam-runners-flink-1.10:2.24.0”, it worked fine and output files were saved successfully.

      I am basically generating unbounded data in memory using GenerateSequence transform and saving it into local files. Here is the code that generates unlimited data in memory:

      Pipeline.apply(GenerateSequence.from(0).withRate(1, new Duration(10)))

      .apply(Window.into[java.lang.Long](FixedWindows.of(Duration.standardSeconds(1))))

      I compared the logs and noticed that there is no write operation found in the logs with “beam-runners-flink-1.11:2.25.0” and “beam-runners-flink-1.10:2.25.0”. With the working version “beam-runners-flink-1.10:2.24.0”, I could find below logs that was obviously doing the write operation:

      [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous) -> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) -> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair with random key/ParMultiDo(AssignShard) (9/12)] INFO org.apache.beam.sdk.io.WriteFiles - Finalizing 1 file results

      [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous) -> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) -> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair with random key/ParMultiDo(AssignShard) (9/12)] INFO org.apache.beam.sdk.io.FileBasedSink - Will copy temporary file FileResult{tempFilename=/Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2, shard=2, window=[2020-11-24T01:33:59.000Z..2020-11-24T01:34:00.000Z), paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /Users/taol/data/output/output-2020-11-24T01:33:59.000Z-2020-11-24T01:34:00.000Z-00002-of-00010.parquet

      [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous) -> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) -> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair with random key/ParMultiDo(AssignShard) (9/12)] INFO org.apache.beam.sdk.io.FileBasedSink - Will remove known temporary file /Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2

      Attachments

        Activity

          People

            Unassigned Unassigned
            ibzib Kyle Weaver
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: