Flume
  1. Flume
  2. FLUME-1045

Proposal to support disk based spooling

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: v1.0.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:
    • Release Note:
      I

      Description

      1. Problem Description
      A sink being unavailable at any stage in the pipeline causes it to back-off and retry after a while. Channel's associated with such sinks start buffering data with the caveat that if you are using a memory channel it can result in a domino effect on the entire pipeline. There could be legitimate down times eg: HDFS sink being down for name node maintenance, hadoop upgrades.

      2. Why not use a durable channel (JDBC, FileChannel)?
      Want high throughput and support sink down times as a first class use-case.

      1. FLUME-1045-2.patch
        15 kB
        Inder SIngh
      2. FLUME-1045-1.patch
        15 kB
        Inder SIngh

        Issue Links

          Activity

          Hide
          Otis Gospodnetic added a comment -

          Maybe this issue should be Won't Fix or Dupe because FLUME-1227 just got committed?

          Show
          Otis Gospodnetic added a comment - Maybe this issue should be Won't Fix or Dupe because FLUME-1227 just got committed?
          Hide
          Jarek Jarcec Cecho added a comment -

          I agree with Brock, let's move the effort and discussion to FLUME-1227 (I've just shared my thoughts there).

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - I agree with Brock, let's move the effort and discussion to FLUME-1227 (I've just shared my thoughts there). Jarcec
          Hide
          Venkatesh Seetharam added a comment -

          @Patrick, I'm not suggesting that the spillable channel would inherit the durability guarantees of File channel. It should have its own definition and working down from there into implementation, you'd compose the existing ones to achieve the said goals. This underlying implementation is not exposed to the user.
          I was thinking to use a configurable High watermark in the memory channel and spill once this is reached into file channel in batch and the overhead you incur is much less than spilling every event while maintaining the transaction boundaries as well.

          Show
          Venkatesh Seetharam added a comment - @Patrick, I'm not suggesting that the spillable channel would inherit the durability guarantees of File channel. It should have its own definition and working down from there into implementation, you'd compose the existing ones to achieve the said goals. This underlying implementation is not exposed to the user. I was thinking to use a configurable High watermark in the memory channel and spill once this is reached into file channel in batch and the overhead you incur is much less than spilling every event while maintaining the transaction boundaries as well.
          Hide
          Brock Noland added a comment -

          +1 one moving this effort to FLUME-1227

          Show
          Brock Noland added a comment - +1 one moving this effort to FLUME-1227
          Hide
          NO NAME added a comment -

          A major design goal in flume-ng is to keep the channel implementations and interfaces as simple as possible. Trying to enact this as a combination of channels would require two things that I don't think will fly:

          • Adding logic about composing channels (if they are to be combined within an agent)
          • Creating tune-able reliability levels in the FileChannel

          The goal is for each channel to have a clear, immutable durability semantics. Of course, there would be some benefits in trying to merge memory + file channel, but it would be outweighed by the complexity costs and general deviation from Flume's design.

          As a result, I would be in favor of having a new channel which basically enacts memory + disk spilling, offers "best effort" semantics but loses data in a subset of cases where the current MemoryChannel loses data (i.e. "better effort"). This is the reason behind FLUME-1227 and I feel somewhat strongly this is the right way to go.

          Inter,

          Your proposal seems to be running a two-level agent topology where a second tier acts as a "spillover" disk-based channel. Frankly, that's not a bad ad-hoc solution to this problem. It would certainly be more useful if the FileChannel had tune-able durability guarantees, as you point out.

          Show
          NO NAME added a comment - A major design goal in flume-ng is to keep the channel implementations and interfaces as simple as possible. Trying to enact this as a combination of channels would require two things that I don't think will fly: Adding logic about composing channels (if they are to be combined within an agent) Creating tune-able reliability levels in the FileChannel The goal is for each channel to have a clear, immutable durability semantics. Of course, there would be some benefits in trying to merge memory + file channel, but it would be outweighed by the complexity costs and general deviation from Flume's design. As a result, I would be in favor of having a new channel which basically enacts memory + disk spilling, offers "best effort" semantics but loses data in a subset of cases where the current MemoryChannel loses data (i.e. "better effort"). This is the reason behind FLUME-1227 and I feel somewhat strongly this is the right way to go. Inter, Your proposal seems to be running a two-level agent topology where a second tier acts as a "spillover" disk-based channel. Frankly, that's not a bad ad-hoc solution to this problem. It would certainly be more useful if the FileChannel had tune-able durability guarantees, as you point out.
          Hide
          Inder SIngh added a comment -

          @Patrick, @Jarec, @Venkatesh

          what your thoughts about the above configuration to combine file and memory channel.
          Having the flush frequency in file channel configurable could achieve both tight & relaxed semantics for data-loss scenarios.

          This kind of setup would be a shared hosted setup in any org and they may want to combine different streams on the same instance, wherein depending on data importance different semantics would be required for different streams.

          Show
          Inder SIngh added a comment - @Patrick, @Jarec, @Venkatesh what your thoughts about the above configuration to combine file and memory channel. Having the flush frequency in file channel configurable could achieve both tight & relaxed semantics for data-loss scenarios. This kind of setup would be a shared hosted setup in any org and they may want to combine different streams on the same instance, wherein depending on data importance different semantics would be required for different streams.
          Hide
          NO NAME added a comment -

          Hey I'm just getting caught up on this discussion. One issue (or misunderstanding) that I have with Sharad's proposal, and any of the proposals that seem to suggest a composed "MemoryChannel + File Channel" is what we want here, is that the existing FileChannel has certain transaction guarantees that you would not want in this case.

          If you are running a memory channel and you want to spill over to disk, you are already accepting "best effort" delivery semantics for the normal case where all of the data is fitting in memory.

          If our spillover implementation directly uses, or functionally mirrors, the existing FileChannel, we'll be offering much stronger semantics once the data has spilled over to disk, at a high throughput cost.

          For instance, the FileChannel flushes to disk on every transaction to avoid data loss. If we were to build a disk-spilling extension to the existing MemoryChannel, we'd likely want to batch these disk flushes to make the aggregate disk throughput better. We just wouldn't want the strong semantics offered by the FileChannel.

          That is why I think that just extending the Memory Channel to have some type of best effort disk spilling would be best, since it differs in fundamental ways from what is accomplished with the FileChannel.

          Show
          NO NAME added a comment - Hey I'm just getting caught up on this discussion. One issue (or misunderstanding) that I have with Sharad's proposal, and any of the proposals that seem to suggest a composed "MemoryChannel + File Channel" is what we want here, is that the existing FileChannel has certain transaction guarantees that you would not want in this case. If you are running a memory channel and you want to spill over to disk, you are already accepting "best effort" delivery semantics for the normal case where all of the data is fitting in memory. If our spillover implementation directly uses, or functionally mirrors, the existing FileChannel, we'll be offering much stronger semantics once the data has spilled over to disk, at a high throughput cost. For instance, the FileChannel flushes to disk on every transaction to avoid data loss. If we were to build a disk-spilling extension to the existing MemoryChannel, we'd likely want to batch these disk flushes to make the aggregate disk throughput better. We just wouldn't want the strong semantics offered by the FileChannel. That is why I think that just extending the Memory Channel to have some type of best effort disk spilling would be best, since it differs in fundamental ways from what is accomplished with the FileChannel.
          Hide
          Jarek Jarcec Cecho added a comment -

          @Venkatesh - I did not consider it as an non-solvable problem. I was actually planning to add some generic methods about capacity and fulfillment of a channel to main Channel interface. What I wanted to express is that it's not easily doable at the moment - which is actually a reason why I did not yet started and let Patrik to handle it.

          Jacec

          Show
          Jarek Jarcec Cecho added a comment - @Venkatesh - I did not consider it as an non-solvable problem. I was actually planning to add some generic methods about capacity and fulfillment of a channel to main Channel interface. What I wanted to express is that it's not easily doable at the moment - which is actually a reason why I did not yet started and let Patrik to handle it. Jacec
          Hide
          Venkatesh Seetharam added a comment -

          IMHO, what Sharad outlines is a very reasonable approach and should not be hard to tie things together.

          I'm just worried here that current channel interface do not expose number of stored entries. So questions like "is memory channel full" or "do file channel contains less than memory channel capacity might not work

          Jarcec, why do you consider this a problem? Do you consider this leaky? Metrics on top of the channel is necessary IMO and is not leaky.

          Show
          Venkatesh Seetharam added a comment - IMHO, what Sharad outlines is a very reasonable approach and should not be hard to tie things together. I'm just worried here that current channel interface do not expose number of stored entries. So questions like "is memory channel full" or "do file channel contains less than memory channel capacity might not work Jarcec, why do you consider this a problem? Do you consider this leaky? Metrics on top of the channel is necessary IMO and is not leaky.
          Hide
          Inder SIngh added a comment -

          Since the formatting of the config file is messed a little by JIRA. Please find the config at -> https://docs.google.com/document/d/1KqhYYBltnwYjaqkDueji0YqJRUY-kOxleiQttGh9FRg/edit

          Show
          Inder SIngh added a comment - Since the formatting of the config file is messed a little by JIRA. Please find the config at -> https://docs.google.com/document/d/1KqhYYBltnwYjaqkDueji0YqJRUY-kOxleiQttGh9FRg/edit
          Hide
          Inder SIngh added a comment -

          Hello Guys,

          i want to reopen this discussion on a totally different note.
          So i wanted to configure FLUME to achieve this scenario something like this ->

          1. AVROSOURCE--->MEMORY CHANNEL---->FAILOVERSINKPROCESSOR----->HDFSSINK (primary)

          ------>AVROSINK -> FILECHANNEL ->HDFSSINK

          A detailed diagram explaining this can be found at -https://docs.google.com/drawings/d/1qiCASG7YE35G9TtDjVVE_ontHeDlaYmt0MZkGS4yQGw/edit?pli=1

          The configuration which i used to start FLUME is something like

              1. – CHANNELS —
          1. Define a memory channel called mainchannel on agent1
            agent1.channels.mainchannel.type = memory
            agent1.channels.spoolchannel.type = file
            1. — SOURCES ----
              agent1.sources.seq-source.type = seq
              agent1.sources.seq-source.channels = mainchannel

          #backup source to run filechannel for spooling
          agent1.sources.avro-source2.type = avro
          agent1.sources.avro-source2.bind = 0.0.0.0
          agent1.sources.avro-source2.port = 41419
          agent1.sources.avro-source2.channels = spoolchannel

            1. ---- SINKS -----
              #sink group primary to HDFS and failover to avrosink
              #to spool to file channel

          agent1.sinkgroups.group1.sinks = hdfs-sink avro-spool-sink
          agent1.sinkgroups.group1.processor.type = failover
          agent1.sinkgroups.group1.processor.priority.hdfs-sink = 5
          agent1.sinkgroups.group1.processor.priority.avro-spool-sink = 10

          agent1.sinkgroups.group2.sinks = hdfs-spool-sink
          agent1.sinkgroups.group2.processor.type = default

          agent1.sinks.hdfs-sink.type = hdfs
          agent1.sinks.hdfs-sink.channel = mainchannel
          agent1.sinks.hdfs-sink.hdfs.path = hdfs://localhost
          agent1.sinks.hdfs-sink.hdfs.fileType = datastream
          agent1.sinks.hdfs-sink.hdfs.filePrefix = flume-inder3-data/

          #agent1 backup sink is avro sink
          #which reads from mainchannel if hdfs-sink is down
          #and puts it to avro-source2 which will be connected to avro-spool-sink
          agent1.sinks.avro-spool-sink.type = avro
          agent1.sinks.avro-spool-sink.hostname = 0.0.0.0
          agent1.sinks.avro-spool-sink.port = 41419
          agent1.sinks.avro-spool-sink.batch-size = 100
          agent1.sinks.avro-spool-sink.channel = mainchannel

          #sink to despool from file channel
          agent1.sinks.hdfs-spool-sink.type = hdfs
          agent1.sinks.hdfs-spool-sink.channel = spoolchannel
          agent1.sinks.hdfs-spool-sink.hdfs.path = hdfs://localhost
          agent1.sinks.hdfs-spool-sink.hdfs.fileType = datastream
          agent1.sinks.hdfs-spool-sink.hdfs.filePrefix = flume-inder3-data/

          1. Finally, now that we've defined all of our components, tell
          2. agent1 which ones we want to activate.
            agent1.sources = seq-source avro-source2
            agent1.sinkgroups = group1 group2
            agent1.sinks = hdfs-spool-sink avro-spool-sink hdfs-sink
            agent1.channels = mainchannel spoolchannel

          After starting my sequence generator source doesn't start and nothing comes in the LOG.
          Sample LOG -> http://pastebin.com/9CGhcTqt

          I also tried taking a thread dump -> http://pastebin.com/tub26YrX... all threads are pretty much not doing anything...

          Can folks review the config once to see if anything is wrong there...
          FLUME doesn't seem to be starting!

          Show
          Inder SIngh added a comment - Hello Guys, i want to reopen this discussion on a totally different note. So i wanted to configure FLUME to achieve this scenario something like this -> 1. AVROSOURCE--- >MEMORY CHANNEL ---- >FAILOVERSINKPROCESSOR ----->HDFSSINK (primary) ------>AVROSINK -> FILECHANNEL ->HDFSSINK A detailed diagram explaining this can be found at - https://docs.google.com/drawings/d/1qiCASG7YE35G9TtDjVVE_ontHeDlaYmt0MZkGS4yQGw/edit?pli=1 The configuration which i used to start FLUME is something like – CHANNELS — Define a memory channel called mainchannel on agent1 agent1.channels.mainchannel.type = memory agent1.channels.spoolchannel.type = file — SOURCES ---- agent1.sources.seq-source.type = seq agent1.sources.seq-source.channels = mainchannel #backup source to run filechannel for spooling agent1.sources.avro-source2.type = avro agent1.sources.avro-source2.bind = 0.0.0.0 agent1.sources.avro-source2.port = 41419 agent1.sources.avro-source2.channels = spoolchannel ---- SINKS ----- #sink group primary to HDFS and failover to avrosink #to spool to file channel agent1.sinkgroups.group1.sinks = hdfs-sink avro-spool-sink agent1.sinkgroups.group1.processor.type = failover agent1.sinkgroups.group1.processor.priority.hdfs-sink = 5 agent1.sinkgroups.group1.processor.priority.avro-spool-sink = 10 agent1.sinkgroups.group2.sinks = hdfs-spool-sink agent1.sinkgroups.group2.processor.type = default agent1.sinks.hdfs-sink.type = hdfs agent1.sinks.hdfs-sink.channel = mainchannel agent1.sinks.hdfs-sink.hdfs.path = hdfs://localhost agent1.sinks.hdfs-sink.hdfs.fileType = datastream agent1.sinks.hdfs-sink.hdfs.filePrefix = flume-inder3-data/ #agent1 backup sink is avro sink #which reads from mainchannel if hdfs-sink is down #and puts it to avro-source2 which will be connected to avro-spool-sink agent1.sinks.avro-spool-sink.type = avro agent1.sinks.avro-spool-sink.hostname = 0.0.0.0 agent1.sinks.avro-spool-sink.port = 41419 agent1.sinks.avro-spool-sink.batch-size = 100 agent1.sinks.avro-spool-sink.channel = mainchannel #sink to despool from file channel agent1.sinks.hdfs-spool-sink.type = hdfs agent1.sinks.hdfs-spool-sink.channel = spoolchannel agent1.sinks.hdfs-spool-sink.hdfs.path = hdfs://localhost agent1.sinks.hdfs-spool-sink.hdfs.fileType = datastream agent1.sinks.hdfs-spool-sink.hdfs.filePrefix = flume-inder3-data/ Finally, now that we've defined all of our components, tell agent1 which ones we want to activate. agent1.sources = seq-source avro-source2 agent1.sinkgroups = group1 group2 agent1.sinks = hdfs-spool-sink avro-spool-sink hdfs-sink agent1.channels = mainchannel spoolchannel After starting my sequence generator source doesn't start and nothing comes in the LOG. Sample LOG -> http://pastebin.com/9CGhcTqt I also tried taking a thread dump -> http://pastebin.com/tub26YrX ... all threads are pretty much not doing anything... Can folks review the config once to see if anything is wrong there... FLUME doesn't seem to be starting!
          Hide
          Jarek Jarcec Cecho added a comment -

          Hi Sharad,
          your understanding of FLUME-1201 is correct. We closed that discussion with result that entire purpose of the idea is just to support this very specific use case. Which is the reason why we've opened follow up FLUME-1227.

          Thank you for your feedback, it helped me understand your orignal proposal. I believe that the proposal make sense. I'm just worried here that current channel interface do not expose number of stored entries. So questions like "is memory channel full" or "do file channel contains less than memory channel capacity * (some threshold say 0.2)" might not work.

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - Hi Sharad, your understanding of FLUME-1201 is correct. We closed that discussion with result that entire purpose of the idea is just to support this very specific use case. Which is the reason why we've opened follow up FLUME-1227 . Thank you for your feedback, it helped me understand your orignal proposal. I believe that the proposal make sense. I'm just worried here that current channel interface do not expose number of stored entries. So questions like "is memory channel full" or "do file channel contains less than memory channel capacity * (some threshold say 0.2)" might not work. Jarcec
          Hide
          Sharad Agarwal added a comment -

          What I understand from FLUME-1201 is that it proposes primary and secondary channels as the first class higher level concept in FLUME, where a new channel can be created by composing any arbitrary channels in a hierarchy.
          However here the difference is that we are creating a very specific channel type whose contract and transaction semantics are clearly well defined. Instead of rewriting the full implementation from scratch, it just leverages the existing implementations. Also note that it only depends on the public and defined contract of the specific channels and not depending on their internal implementations. This should be ok IMO.

          Show
          Sharad Agarwal added a comment - What I understand from FLUME-1201 is that it proposes primary and secondary channels as the first class higher level concept in FLUME, where a new channel can be created by composing any arbitrary channels in a hierarchy. However here the difference is that we are creating a very specific channel type whose contract and transaction semantics are clearly well defined. Instead of rewriting the full implementation from scratch, it just leverages the existing implementations. Also note that it only depends on the public and defined contract of the specific channels and not depending on their internal implementations. This should be ok IMO.
          Hide
          Jarek Jarcec Cecho added a comment -

          Hi Sharad,
          thank you very much for your input. I believe it's exactly what I've described in FLUME-1227.

          We've discussed the possibility to have "composed" channels in FLUME-1201 and we got to a conclusion that we do not want to do that from various reasons. Please see the discussion for details. That's the reason why I've explicitly specified in FLUME-1227, that implementation should be independent on any channel internal structures.

          What do you think about having that functionality provided by completely independent piece of code?

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - Hi Sharad, thank you very much for your input. I believe it's exactly what I've described in FLUME-1227 . We've discussed the possibility to have "composed" channels in FLUME-1201 and we got to a conclusion that we do not want to do that from various reasons. Please see the discussion for details. That's the reason why I've explicitly specified in FLUME-1227 , that implementation should be independent on any channel internal structures. What do you think about having that functionality provided by completely independent piece of code? Jarcec
          Hide
          Inder SIngh added a comment -

          Jarcec,

          for your reference links are here

          1. http://incubator.apache.org/kafka/design.html (P.S. - refer section Message Persistence and Caching
          Don't fear the filesystem!)
          2. Varnish Cache design (https://www.varnish-cache.org/trac/wiki/ArchitectNotes)

          P.S. - we can definitely add the virtual mem channel using the algo outlined by Sharad above. These links are meant for evaluating the performance of FILE Channel w.r.t sequential disk IO.

          Show
          Inder SIngh added a comment - Jarcec, for your reference links are here 1. http://incubator.apache.org/kafka/design.html (P.S. - refer section Message Persistence and Caching Don't fear the filesystem!) 2. Varnish Cache design ( https://www.varnish-cache.org/trac/wiki/ArchitectNotes ) P.S. - we can definitely add the virtual mem channel using the algo outlined by Sharad above. These links are meant for evaluating the performance of FILE Channel w.r.t sequential disk IO.
          Hide
          Sharad Agarwal added a comment -

          I think it is useful to have a trade off between performance and durability. With MemoryChannel and FileChannel already in place, I can think of a very simple implementation like this:

          • Compose MemoryChannel and FileChannel. At a given point in time, this channel acts as MemoryChannel or FileChannel
          • By default acts as memory channel.
          • Switch to file channel mode when memory channel is full
          • drain all events from memory channel into the file channel
          • Serves all transactions from file channel
          • Revert back to memory channel mode when this condition is met:
            remaining events in file channel <= memory channel capacity * (some threshold say 0.2)
          • drain all events from file channel into the memory channel

          The above algo would ensure in order message delivery as well as spooling.

          thoughts ?

          Show
          Sharad Agarwal added a comment - I think it is useful to have a trade off between performance and durability. With MemoryChannel and FileChannel already in place, I can think of a very simple implementation like this: Compose MemoryChannel and FileChannel. At a given point in time, this channel acts as MemoryChannel or FileChannel By default acts as memory channel. Switch to file channel mode when memory channel is full drain all events from memory channel into the file channel Serves all transactions from file channel Revert back to memory channel mode when this condition is met: remaining events in file channel <= memory channel capacity * (some threshold say 0.2) drain all events from file channel into the memory channel The above algo would ensure in order message delivery as well as spooling. thoughts ?
          Hide
          Jarek Jarcec Cecho added a comment -

          Hi Inder,
          could you please link the research you've mentioned?

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - Hi Inder, could you please link the research you've mentioned? Jarcec
          Hide
          Inder SIngh added a comment -

          Jarek,

          Have you had a chance to play with the FileChannel. With reference to research done by KAFKA folks performance in sequential disk IO isn't bad and sometimes comparable to MAIN MEM in which case do we really need the virtual memory channel which anyways would be composed of memory channel & file channel without much smartness built in it.

          Mike/Arvind/Hari, what your thoughts?

          Show
          Inder SIngh added a comment - Jarek, Have you had a chance to play with the FileChannel. With reference to research done by KAFKA folks performance in sequential disk IO isn't bad and sometimes comparable to MAIN MEM in which case do we really need the virtual memory channel which anyways would be composed of memory channel & file channel without much smartness built in it. Mike/Arvind/Hari, what your thoughts?
          Hide
          Jarek Jarcec Cecho added a comment -

          Agreed.

          Show
          Jarek Jarcec Cecho added a comment - Agreed.
          Hide
          Inder SIngh added a comment -

          Jarek,

          current path implementation was the old thought, however after chatting with Arvind and Sharad we had finally decided to implement at channel level, something like a VirtualMemoryChannel.

          As Sharad stated we can work together on this and get it reviewed by Mike & team. I'll create a one pager check-poinitng the current discussion attach it, which we can review with everyone and later divide and conquer on implementation.

          Show
          Inder SIngh added a comment - Jarek, current path implementation was the old thought, however after chatting with Arvind and Sharad we had finally decided to implement at channel level, something like a VirtualMemoryChannel. As Sharad stated we can work together on this and get it reviewed by Mike & team. I'll create a one pager check-poinitng the current discussion attach it, which we can review with everyone and later divide and conquer on implementation.
          Hide
          Jarek Jarcec Cecho added a comment -

          I've checked attached patch and it seems that current implementation is trying to implement the logic on sink side. I would personally prefer to implement spilling events to the disk on channel side in case that it gets full – from whatever reasons networking issues, maintenance window on Hadoop, ... It seems as better place for such code as the sink should be just responsible for sending events, not to cache them.

          Inder, what do you think about that?

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - I've checked attached patch and it seems that current implementation is trying to implement the logic on sink side. I would personally prefer to implement spilling events to the disk on channel side in case that it gets full – from whatever reasons networking issues, maintenance window on Hadoop, ... It seems as better place for such code as the sink should be just responsible for sending events, not to cache them. Inder, what do you think about that? Jarcec
          Hide
          Sharad Agarwal added a comment -

          This is indeed an useful feature for lot of folks. Since the goal of both the jiras are same, I would recommend to do the work and capture all discussions at the same place on this jira. It would be great if Inder and Jarek, you guys can collaborate in design and implementation of this. make sense ?
          Thanks!

          Show
          Sharad Agarwal added a comment - This is indeed an useful feature for lot of folks. Since the goal of both the jiras are same, I would recommend to do the work and capture all discussions at the same place on this jira. It would be great if Inder and Jarek, you guys can collaborate in design and implementation of this. make sense ? Thanks!
          Hide
          Jarek Jarcec Cecho added a comment -

          I was planning to give it week or so for discussions and than start working on that. What are your plans, Inder?

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - I was planning to give it week or so for discussions and than start working on that. What are your plans, Inder? Jarcec
          Hide
          Inder SIngh added a comment -

          I was planning to modify it based on the review comments received by Arvind. Jarek, is there a timeline you are looking at?

          Show
          Inder SIngh added a comment - I was planning to modify it based on the review comments received by Arvind. Jarek, is there a timeline you are looking at?
          Hide
          Jarek Jarcec Cecho added a comment -

          Hi Inder,
          are you still working on that or do you want me to jump in?

          I've opened similar JIRA FLUME-1227 as I missed this opened ticket and I would like to start working on that in case that won't continue.

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - Hi Inder, are you still working on that or do you want me to jump in? I've opened similar JIRA FLUME-1227 as I missed this opened ticket and I would like to start working on that in case that won't continue. Jarcec
          Hide
          Inder SIngh added a comment -

          Arvind,

          I'll work with Sharad to vet out of the details of the so called VirutalMemoryChannel. Will post soon for reviews.

          Show
          Inder SIngh added a comment - Arvind, I'll work with Sharad to vet out of the details of the so called VirutalMemoryChannel. Will post soon for reviews.
          Hide
          Arvind Prabhakar added a comment -

          Curious to know what is the right way in current Flume architecture to trade off transactional guarantees with very high thruput system; providing certain degree of reliability incase the next link is down ?

          This seems to be a confusion between design and implementation. The design requires that the channel expose transactional semantics. The channel implementation decides the degree of implementation. For example, the transactional semantics implemented by the JDBC channel are very strict, whereas that implemented by the Memory channel are weak.

          However, since the design requires both these channels to expose transactional semantics, you can switch the channels to suite your flow needs.

          The solution being discussed here - disk based spooling on the sink side - goes outside the scope of this design to accommodate throughput requirements. If implemented, the messages that are spooled will be outside of the transaction boundary and thus will invalidate the safety guarantee of the system.

          One of the solution which I think of where the IO cost is incurred on only failures and still things are transactional: Wrap the MemoryChannel and FileChannel into a new channel say SpoolingMemoryChannel. Events flow via memory channel; on reaching the buffer capacity of memory channel, events are spooled into FileChannel. Since the underlying channels are transactional, SpoolingMemoryChannel can also be easily made transactional.

          This sounds like a promising solution. The key thing to watch out here is the ordering requirement. In general, channels are expected to preserve the order of events. As long as that is take care of and the transactional semantics make sense, then it could be the stop-gap solution until we have a high-throughput file based channel implemented.

          Show
          Arvind Prabhakar added a comment - Curious to know what is the right way in current Flume architecture to trade off transactional guarantees with very high thruput system; providing certain degree of reliability incase the next link is down ? This seems to be a confusion between design and implementation. The design requires that the channel expose transactional semantics. The channel implementation decides the degree of implementation. For example, the transactional semantics implemented by the JDBC channel are very strict, whereas that implemented by the Memory channel are weak. However, since the design requires both these channels to expose transactional semantics, you can switch the channels to suite your flow needs. The solution being discussed here - disk based spooling on the sink side - goes outside the scope of this design to accommodate throughput requirements. If implemented, the messages that are spooled will be outside of the transaction boundary and thus will invalidate the safety guarantee of the system. One of the solution which I think of where the IO cost is incurred on only failures and still things are transactional: Wrap the MemoryChannel and FileChannel into a new channel say SpoolingMemoryChannel. Events flow via memory channel; on reaching the buffer capacity of memory channel, events are spooled into FileChannel. Since the underlying channels are transactional, SpoolingMemoryChannel can also be easily made transactional. This sounds like a promising solution. The key thing to watch out here is the ordering requirement. In general, channels are expected to preserve the order of events. As long as that is take care of and the transactional semantics make sense, then it could be the stop-gap solution until we have a high-throughput file based channel implemented.
          Hide
          Sharad Agarwal added a comment -

          as it violates the transactional exchange invariant of the design

          Some systems have very high thruput requirement and have relaxed transaction needs. Typically these applications want the system to run at very high thruput and incase of failures, are ok to lose or replay small number of events.
          FileChannel intends to be fully transactional and also high thruput. However it will be IO/disk bound.

          1. Curious to know what is the right way in current Flume architecture to trade off transactional guarantees with very high thruput system; providing certain degree of reliability incase the next link is down ?

          2. One of the solution which I think of where the IO cost is incurred on only failures and still things are transactional:
          Wrap the MemoryChannel and FileChannel into a new channel say SpoolingMemoryChannel. Events flow via memory channel; on reaching the buffer capacity of memory channel, events are spooled into FileChannel. Since the underlying channels are transactional, SpoolingMemoryChannel can also be easily made transactional.

          Show
          Sharad Agarwal added a comment - as it violates the transactional exchange invariant of the design Some systems have very high thruput requirement and have relaxed transaction needs. Typically these applications want the system to run at very high thruput and incase of failures, are ok to lose or replay small number of events. FileChannel intends to be fully transactional and also high thruput. However it will be IO/disk bound. 1. Curious to know what is the right way in current Flume architecture to trade off transactional guarantees with very high thruput system; providing certain degree of reliability incase the next link is down ? 2. One of the solution which I think of where the IO cost is incurred on only failures and still things are transactional: Wrap the MemoryChannel and FileChannel into a new channel say SpoolingMemoryChannel. Events flow via memory channel; on reaching the buffer capacity of memory channel, events are spooled into FileChannel. Since the underlying channels are transactional, SpoolingMemoryChannel can also be easily made transactional.
          Hide
          Arvind Prabhakar added a comment -

          Inder - yes the File Channel is work in progress and should be available soon. I don't think the suggested workaround above is a correct for the meantime as it violates the transactional exchange invariant of the design.

          What is your time frame for requiring the file channel? Could you possibly use the JDBC channel until that is ready? If you have tried it and have found issues with it, please let us know.

          Show
          Arvind Prabhakar added a comment - Inder - yes the File Channel is work in progress and should be available soon. I don't think the suggested workaround above is a correct for the meantime as it violates the transactional exchange invariant of the design. What is your time frame for requiring the file channel? Could you possibly use the JDBC channel until that is ready? If you have tried it and have found issues with it, please let us know.
          Hide
          Inder SIngh added a comment -

          Arvind,

          Thanks and appreciate your prompt feedback. I understand that it doesn't directly fit the abstraction's of flume in current state. Not sure whether thinking about it as a transient sink makes it any better? We were contemplating replacing an existing system with flume. With it's current state there were concerns around operablility of the system using a memory channel.

          Building a file-channel to support transaction semantics across multiple source & sink threads is challenging and i believe it is WIP. This could act as a good alternative for folks using mem channel and avoid the repel effect of a sink/agent being down. Worst case we could use it as as stop gap solution till a high throughput file channel is available.

          Please share your thoughts and if you agree i believe the concerns you highlighted could be worked on in an incremental way with your inputs, otherwise please advise on the correct route to be taken here.

          Show
          Inder SIngh added a comment - Arvind, Thanks and appreciate your prompt feedback. I understand that it doesn't directly fit the abstraction's of flume in current state. Not sure whether thinking about it as a transient sink makes it any better? We were contemplating replacing an existing system with flume. With it's current state there were concerns around operablility of the system using a memory channel. Building a file-channel to support transaction semantics across multiple source & sink threads is challenging and i believe it is WIP. This could act as a good alternative for folks using mem channel and avoid the repel effect of a sink/agent being down. Worst case we could use it as as stop gap solution till a high throughput file channel is available. Please share your thoughts and if you agree i believe the concerns you highlighted could be worked on in an incremental way with your inputs, otherwise please advise on the correct route to be taken here.
          Hide
          Arvind Prabhakar added a comment -

          Hi Inder - thanks for taking the initiative to provide this functionality. However, I am not sure if this fits well with the design due to following reasons:

          • When a sink spools events they get removed from the channel, thereby losing transactional exchange guarantee for the next hop. This implies that the flow is no longer reliable.
          • The domino effect of channel's reaching capacity is not really a problem. In fact, this is the main value add of buffered flows that allows every agent to queue up events waiting for the destination to get unblocked.
          • Even when implemented, the problem will still remain as the spool may fill to capacity causing the said domino effect.

          As you have noted, the motivation for this change is to have a high throughput flow that supports sink downtimes. The expected way to address this is to go with a high performance channel that is capable of delivering same throughput levels as the Memory channel without being limited to the available system memory.

          Show
          Arvind Prabhakar added a comment - Hi Inder - thanks for taking the initiative to provide this functionality. However, I am not sure if this fits well with the design due to following reasons: When a sink spools events they get removed from the channel, thereby losing transactional exchange guarantee for the next hop. This implies that the flow is no longer reliable. The domino effect of channel's reaching capacity is not really a problem. In fact, this is the main value add of buffered flows that allows every agent to queue up events waiting for the destination to get unblocked. Even when implemented, the problem will still remain as the spool may fill to capacity causing the said domino effect. As you have noted, the motivation for this change is to have a high throughput flow that supports sink downtimes. The expected way to address this is to go with a high performance channel that is capable of delivering same throughput levels as the Memory channel without being limited to the available system memory.
          Hide
          Inder SIngh added a comment - - edited

          Uploading the right patch. Please refer to - https://issues.apache.org/jira/secure/attachment/12519425/FLUME-1045-2.patch as the correct version

          Show
          Inder SIngh added a comment - - edited Uploading the right patch. Please refer to - https://issues.apache.org/jira/secure/attachment/12519425/FLUME-1045-2.patch as the correct version
          Hide
          Inder SIngh added a comment - - edited

          Initial version of patch contains following

          1. Skeleton code to disk based solution proposed in this JIRA.

          Disclamer - This patch isn't functional/complete yet.
          Objective is to get community consensus around the proposal.

          Show
          Inder SIngh added a comment - - edited Initial version of patch contains following 1. Skeleton code to disk based solution proposed in this JIRA. Disclamer - This patch isn't functional/complete yet. Objective is to get community consensus around the proposal.
          Hide
          Inder SIngh added a comment -

          Proposed Solution
          ------------------

          Sink triggered spooling
          ----------------------------
          A sink going down/all sinks go down in a failover policy setup triggers spooling of data from the channel to local disk. As and when there is a successful commit from the channel to one of the sinks a de-spool is triggered from local disk to channel.

          Proposed Implementation
          ---------------------------

          1. SpooledFailoverSinkProcessor – extending from FailoverSinkProcessor. Capabilities include triggering spool(), despool() when the sink go down and comes up respectively.

          Some more design choices & assumptions
          ----------------------------------------
          1. Persist avro serialized objects in local disk which preserves data & headers.
          2. Use channel based transaction semantics while spooling to avoid any data loss.
          3. Spool location is configurable for each SinkGroup controlled by “spool-dir". Event’s will be spooled in batches controlled by “spool-batch-size “ Spool files will be rolled over after they reach a size controlled by “spoolfile-size”.
          4. Validation to avoid misconfiguration of overlapping spool locations across SinkGroups.
          5. De-spooling happens one file at a time to avoid the complexity of persisting offsets in the first cut.

          Show
          Inder SIngh added a comment - Proposed Solution ------------------ Sink triggered spooling ---------------------------- A sink going down/all sinks go down in a failover policy setup triggers spooling of data from the channel to local disk. As and when there is a successful commit from the channel to one of the sinks a de-spool is triggered from local disk to channel. Proposed Implementation --------------------------- 1. SpooledFailoverSinkProcessor – extending from FailoverSinkProcessor. Capabilities include triggering spool(), despool() when the sink go down and comes up respectively. Some more design choices & assumptions ---------------------------------------- 1. Persist avro serialized objects in local disk which preserves data & headers. 2. Use channel based transaction semantics while spooling to avoid any data loss. 3. Spool location is configurable for each SinkGroup controlled by “spool-dir". Event’s will be spooled in batches controlled by “spool-batch-size “ Spool files will be rolled over after they reach a size controlled by “spoolfile-size”. 4. Validation to avoid misconfiguration of overlapping spool locations across SinkGroups. 5. De-spooling happens one file at a time to avoid the complexity of persisting offsets in the first cut.

            People

            • Assignee:
              Unassigned
              Reporter:
              Inder SIngh
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:

                Development