Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13203

Potential data loss when using SnsIO.writeAsync

Details

    • Bug
    • Status: Resolved
    • P1
    • Resolution: Won't Fix
    • None
    • 2.37.0
    • io-java-aws
    • None

    Description

      This needs to be investigated, reading the code suggests we might be losing data under certain conditions e.g. when terminating the pipeline. The async processing model here is far too simplistic.

      The bundle won't ever know about pending writes and won't block to wait for any such operation. The same way exceptions are thrown into nowhere. Test cases don't capture this as they operate on completed futures only (so exceptions in the callbacks get thrown on the thread of processElement).

      client.publish(publishRequest).whenComplete((response, ex) -> {
        if (ex == null) {
          SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response);
          context.output(snsResponse);
        } else {
          LOG.error("Error while publishing request to SNS", ex);
          throw new SnsWriteException("Error while publishing request to SNS", ex);
        }
      }); 

      Also, this entirely removes backpressure from a stream. When used with a much faster source we will continue to accumulate more and more memory as the number of concurrent pending async operations is not limited.

      Spotify's scio contains a JavaAsyncDoFn that illustrates how it can be done.

      Attachments

        Issue Links

          Activity

            People

              mosche Moritz Mack
              mosche Moritz Mack
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 40m
                  40m