Flume
  1. Flume
  2. FLUME-1435

Proposal of Transactional Multiplex (fan out) Sink

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: v1.2.0
    • Fix Version/s: None
    • Component/s: Channel, Sinks+Sources
    • Labels:

      Description

      Hi,

      I have proposed this design by email several weeks ago. I received comment from Brock. I guess your guys are very busy, so I think I'd better create this JIRA, and put slides and patch here to explain it more clearly.

      Regards,
      Yongkun

      Following is the design from previous email, I will attach slides later.

      From: "Wang, Yongkun" <yongkun.wang@mail.rakuten.com>
      Date: Wed, 25 Jul 2012 10:32:31 GMT
      To: "dev@flume.apache.org" <dev@flume.apache.org>
      Cc: "user@flume.apache.org" <user@flume.apache.org>
      Subject: Transactional Multiplex (fan out) Sink

      Hi,

      In our system, we need to fan out the aggregated flow to several destinations. Usually the flow to each destination is identical.

      There is a nice feature of NG, the "multiplexing flow", which can satisfy our requirements. It is implemented by using separated channels, which is easy to do transaction control.

      But in our case, the fan out is replicating in most cases. If using the current "Replicating to Channels" configuration, we will get several identical channels on the same host, which may consume a large amount resources (memory, disk, etc.). The performance may possibly drop. And the events to each destination may not be synchronized.

      I read NG source, I think I could move the multiplex from Channel to Sink, that is, using single Channel, fan out to different Sinks, which may solve the problems (resource usage, performance, event synchronization) of multiple Channels.

      I see that there are "LoadBalancingSinkProcessor" and "SinkSelector" classes, but they cannot be used to achieve the target of replicating events from one Channel to different Sinks.

      The following is an optional implementation of the Transactional Multiplex (or fan out) Sink:
      1. Add a Transactional Multiplex Sink Processor, which will group the operations of all fan out Sinks into one transaction, and use a certain policy to commit the transaction.
      2. Add MultiplexSink, which simply processes the Events and report status, no transaction.
      3. Add "peek()" and "remove()" to Channel and Transaction.

      The policy of committing a transaction can be defined as follow (suppose we have N Sinks) :
      1. When M(0=<M<=N) Sinks succeed;
      e.g. value: ANY, ONE, QUARUM, ALL
      2. When specified M(0<M<=N) Sinks (important sinks) succeed.

      A selector can also be used by Transactional Multiplex Sink Processor to filter the events for some Sinks (Optional).

      And this can be combined with the existing Multiplex Channel Flow: Multiplex events into different Channels, each Channel can replicate to different Sinks.

      Would like to hear your suggestions firstly.
      If it is reasonable, I will create a ticket in JIRA and provide the patch for review.

      Cheers,
      Yongkun Wang (Kun)

        Issue Links

          Activity

          Hide
          Yongkun Wang added a comment -

          Nice comment from Mike.

          On 12/08/13 9:17, "Mike Percy" <mpercy@apache.org> wrote:

          Hi,
          Due to design decisions made very early on in Flume NG - specifically the
          fact that Sink only has a simple process() method - I don't see a good way
          to get multiple sinks pulling from the same channel in a way that is
          backwards-compatible with the current implementation.

          Probably the "right" way to support this would be to have an interface
          where the SinkRunner (or something outside of each Sink) is in control of
          the transaction, and then it can easily send events to each sink serially
          or in parallel within a single transaction. I think that is basically what
          you are describing. If you look at SourceRunner and SourceProcessor you
          will see similar ideas to what you are describing but they are only
          implemented at the Source->Channel level. The current SinkProcessor is not
          an analog of SourceProcessor, but if it was then I think that's where this
          functionality might fit. However what happens when you do that is you have
          to handle a ton of failure cases and threading models in a very general
          way, which might be tough to get right for all use cases. I'm not 100%
          sure, but I think that's why this was not pursued at the time.

          To me, this seems like a potential design change (it would have to be very
          carefully thought out) to consider for a future major Flume code line
          (maybe a Flume 2.x).

          By the way, if one is trying to get maximum throughput, then duplicating
          events onto multiple channels, and having different threads running the
          sinks (the current design) will be faster and more resilient in general
          than a single thread and a single channel writing to multiple
          sinks/destinations. The multiple-channel design pattern will allow
          periodic
          downtimes or delays on a single sink to not affect the others, assuming
          the
          channel sizes are large enough for buffering during downtime and assuming
          that each sink is fast enough to recover from temporary delays. Without a
          dedicated buffer per destination, one is at the mercy of the slowest sink
          at every stage in the transaction.

          One last thing worth noting is that the current channels are all well
          ordered. This means that Flume currently provides a weak ordering
          guarantee
          (across a single hop). That is a helpful property in the context of
          testing
          and validation, as well as is what many people expect if they are storing
          logs on a single hop. I hope we don't backpedal on that weak ordering
          guarantee without a really good reason.

          Regards,
          Mike

          Show
          Yongkun Wang added a comment - Nice comment from Mike. On 12/08/13 9:17, "Mike Percy" <mpercy@apache.org> wrote: Hi, Due to design decisions made very early on in Flume NG - specifically the fact that Sink only has a simple process() method - I don't see a good way to get multiple sinks pulling from the same channel in a way that is backwards-compatible with the current implementation. Probably the "right" way to support this would be to have an interface where the SinkRunner (or something outside of each Sink) is in control of the transaction, and then it can easily send events to each sink serially or in parallel within a single transaction. I think that is basically what you are describing. If you look at SourceRunner and SourceProcessor you will see similar ideas to what you are describing but they are only implemented at the Source->Channel level. The current SinkProcessor is not an analog of SourceProcessor, but if it was then I think that's where this functionality might fit. However what happens when you do that is you have to handle a ton of failure cases and threading models in a very general way, which might be tough to get right for all use cases. I'm not 100% sure, but I think that's why this was not pursued at the time. To me, this seems like a potential design change (it would have to be very carefully thought out) to consider for a future major Flume code line (maybe a Flume 2.x). By the way, if one is trying to get maximum throughput, then duplicating events onto multiple channels, and having different threads running the sinks (the current design) will be faster and more resilient in general than a single thread and a single channel writing to multiple sinks/destinations. The multiple-channel design pattern will allow periodic downtimes or delays on a single sink to not affect the others, assuming the channel sizes are large enough for buffering during downtime and assuming that each sink is fast enough to recover from temporary delays. Without a dedicated buffer per destination, one is at the mercy of the slowest sink at every stage in the transaction. One last thing worth noting is that the current channels are all well ordered. This means that Flume currently provides a weak ordering guarantee (across a single hop). That is a helpful property in the context of testing and validation, as well as is what many people expect if they are storing logs on a single hop. I hope we don't backpedal on that weak ordering guarantee without a really good reason. Regards, Mike

            People

            • Assignee:
              Yongkun Wang
              Reporter:
              Yongkun Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:

                Development