Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-2506

Inconsistent end of stream semantics in SystemStreamPartitionMetadata

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 1.5
    • None
    • None

    Description

      When consuming a bounded stream (HdfsSystemConsumer or InMemorySystemConsumer), an IncomingMessageEnvelope with object = EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET is used to delimit the stream and signal that there is no more data to be consumed.

      It is however problematic that each of InMemorySystemAdmin, HdfsSystemAdmin, and SystemStreamPartitionMetadata offer a different usage of "oldestOffset", "newestOffset", and "upcomingOffset" in relation to these cases of bounded streams.

      As described in SystemStreamPartitionMetadata:

      /**
       * @return The oldest offset that still exists in the stream for the
       *         partition given. If a partition has two messages with offsets 0
       *         and 1, respectively, then this method would return 0 for the
       *         oldest offset. This offset is useful when one wishes to read all
       *         messages in a stream from the very beginning. A null value means
       *         the stream is empty.
       */
      public String getOldestOffset() {
        return oldestOffset;
      }
      
      /**
       * @return The newest offset that exists in the stream for the partition
       *         given. If a partition has two messages with offsets 0 and 1,
       *         respectively, then this method would return 1 for the newest
       *         offset. This offset is useful when one wishes to see if all
       *         messages have been read from a stream (offset of last message
       *         read == newest offset). A null value means the stream is empty.
       */
      public String getNewestOffset() {
        return newestOffset;
      }
      
      /**
       * @return The offset that represents the next message to be written in the
       *         stream for the partition given. If a partition has two messages
       *         with offsets 0 and 1, respectively, then this method would return
       *         2 for the upcoming offset. This offset is useful when one wishes
       *         to pick up reading at the very end of a stream. A null value
       *         means the stream is empty.
       */
      public String getUpcomingOffset() {
        return upcomingOffset;
      }
      

       

      Areas of concern are highlighted below in red.

       

      In the case of HdfsSystemConsumer, messages are read from hdfs files until all have been read, and an end of stream envelope is appended to the end of the message buffer. The offset metadata returned per partition is:

      oldestOffset: offset of beginning of first file in hdfs

      newestOffset: offset of beginning of last file in hdfs

      • Does NOT include end of stream in numeric offset count

      upcomingOffset: null

      • Per SystemStreamPartitionMetadata, "null" is meant to indicate empty. However, null seems reasonable as "upcoming" does not make much sense in the context of bounded streams.

      reference: https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java#L210

       

       

       

      Further differences are seen in InMemorySystem, where an IncomingMessageEnvelope is placed at the end of the buffer similar to the HdfsSystemConsumer case above, with an object = EndOfStreamMessage and offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET.

      However, InMemorySystemAdmin will generate the following metadata per partition:

      oldestOffset: 0

      newestOffset: numeric index of last message in the buffer

      • This will be the index of the end of stream message, and this semantic does not match HdfsSystemConsumer above

      upcomingOffset: newestOffset + 1

      • This is reasonable, but again upcoming does not make much sense in the context of bounded streams

      reference: https://github.com/Sanil15/samza/blob/master/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java#L182

       

       

      Impact:

      tl;dr blocks refactoring of side input consumption from leveraging RunLoop

      Side input consumption currently blocks the main container start up thread in in ContainerStorageManager, until, for each side input SSP:

      1. envelope from SystemConsumer returns true for envelope.isEndOfStream (the offset of the envelope is IncomingMessageEnvelope.END_OF_STREAM_OFFSET)
      2. envelope offset is equal to newest offset for that SSP fetched at CSM initialization

      We would like to refactor this flow to, rather than using SystemConsumers directly, leverage RunLoop to take advantage of message dispatching, concurrency, etc.

      Since RunLoop sinks end of stream messages, condition 1 can no longer be checked and CSM needs a consistent way of measuring that an SSP's consumption has reached "head" or "end" according to the offset metadata fetched at init time. This causes integration tests to fail as CSM will be waiting indefinitely on side input SSP consumption to be marked as "caught up".

      Possible (partial) reconciliation:   

      1. Modify InMemorySystemAdmin to return "newestOffset" as the numeric index of the buffered message immediately preceding the end of stream message. This would make the semantics of "newestOffset" match that of HdfsSystemAdmin.
        1.  Minimal semantic change in a component used primarily internally in samza (InMemorySystem is used in test runner)
        2.  Semantics of "upcomingOffset" would be unchanged and left inconsistent with HdfsSytemAdmin
      2. Modify RunLoop to either pass end of stream messages through to tasks either directly or by invoking a callback
        1.  EndOfStreamListenerTask today is task-wide, and only called when ALL SSPs in a task reach end of stream. Giving SSP granularity might be welcome flexibility to customers who intermingle bounded / unbounded streams in the same task.
        2.  Uncertain whether customers would actually benefit, and significant API change.
      3. ??

      Recommendation:

      Solution 1 above is cheap and minimizes changes to the externally facing samza API, as InMemorySystem is primarily used in the internals of samza's test runner.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              bkonold Brett Konold
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 40m
                  40m