Flume
  1. Flume
  2. FLUME-952

Modifying SinkRunner to be pluggable to allow for failover/replication.

    Details

    • Type: Brainstorming Brainstorming
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Not a Problem
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      Implementing the failover sink runner the following was suggested:
      1. This needs to be implemented on top of FLUME-949 which deals with removing the notion of a PollableSink altogether. As a result, the SinkRunner will become a concrete implementation that can then allow different sink handling policies - such as either a failover policy (needed for this issue), or load balancing policy (not needed for this issue). Hence the policy part needs to be pluggable rather than the sink runner itself. An example of such a construct is the ChannelSelector and ChannelProcessor implementations.

      In Flume-865 I have implemented FailoverSinkRunner as a separate runner, but I am open to the idea of making it pluggable if it makes the code more maintainable.

      As is, there are many differences between the requirements for Failover and a normal Sink runner, including configuration, initialisation, shutdown, error handling and event processing. If we were to make this pluggable, many hooks would be needed and I don't think there is that much common behavior that warrants using a pluggable system rather than just a solid base class.

      • Adding a new sink to a runner, with configuration variables(such as priority or weight)
      • Policy for handling process: should this just return a list of sinks to process like ChannelSelector and hand off the processing to Process? I think that the specific failover policy for each type of runner will be different so this feels awkward. I would personally prefer to just pass the process call to the pluggable component and let it be responsible for calling process on the correct sinks, as well as handling errors.

      Right now I am not convinced for the need to make SinkRunner pluggable, but I would be interested to hear other peoples opinions

        Issue Links

          Activity

          Hide
          Jarek Jarcec Cecho added a comment -

          I finally found time to study source codes and form the idea to more or less usable form. Please consider my comment as a brainstorming and therefore feel free to add your own suggestions, ideas or share your concerns and objections. I would split up the task into several sub tasks:

          1) Add support for multiple sinks into SinkRunner
          I would propose to make SinRunner aware of multiple sinks, which include:

          • Constructor should accept array of sinks
          • getSink, setSink should work with that array (and renamed to getSinks, setSinks)
          • start, stop needs to work with all sinks

          2) Change Sink configuration
          Current sink configuration is done on a precondition that one sink have exactly one runner. As we would like to incorporate failover mechanism into the Runner, it's clear that one runner can be used for multiple sinks at once. I believe that we need to incorporate such changes into configuration as well. I would propose following scheme:

          • $host.sinks.$logicalName.runner For runner specific configuration (for example polling interval)
          • $host.sinks.$logicalName.runner.selector Selector type (default, failover, loadbalancer)
          • $host.sinks.$logicalName.1 First sink configuration for the runner
          • $host.sinks.$logicalName.2 Second sink configuration for the runner

          Example of "normal" sink configuration would be:

          agent1.sinks.moo.runner.selector = default
          agent1.sinks.moo.runner.polling.interval = 10
          agent1.sinks.moo.1.type = hdfs
          agent1.sinks.moo.1.hdfs.path = hdfs://namenode:8020/flume-ng/

          Example of "failover" sink configuration:

          agent2.sinks.foo.runner.selector = failover
          agent2.sinks.foo.runner.failover.option1 = value
          agent2.sinks.foo.runner.polling.interval = 10
          agent2.sinks.foo.1.type = hdfs
          agent2.sinks.foo.1.hdfs.path = hdfs://namenode:8020/flume-ng/
          agent2.sinks.foo.2.type = hdfs
          agent2.sinks.foo.2.hdfs.path = hdfs://namenode-on-mars:8020/flume-ng/

          I believe that this change should happen in PropertiesFileConfigurationProvider.loadSinks().

          3) Introduce SinkRunner selector
          Sink runner selector would be interface choosing which sink we should try. I could imagine something like

          public interface SinkSelector

          { int supportedSinks(); void setSinks(Sink[] availableSinks); Sink chooseSink(int try) throws Something; }

          Method supportedSinks() should return integer constant telling how many maximal sinks does it support. For example the default selector will support only 1 sink whereas failover selector might support Intereger.MAX_VALUE (or something more reasonable). Method SinkRunner.setSinks(Sink[]) should validate that number of passed sinks is less or equal to SinkSelector.supportedSinks().

          Method setSinks() should be called once on load (or change) to notify Selector about all available sinks.

          Method chooseSink() should be called from SinkRunner.PollingRunner.run to resolve sink that should be used to call process() method. If this sink won't be able to deliver the event PollingRunner.run() should call SinkSelector.chooseSink() again with incremented argument "try" to resolve another sink to try. Method SinkSelector.chooseSink() should return null in case that there is no other sink to try (or if it do not want to try another sink).

          4) Implement default, failover and balancer SinkSelector(s)

          I'm not expert on flume source code, so I would greatly appreciate any type of feedback. Again, please consider my comment just as a brainstorming.

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - I finally found time to study source codes and form the idea to more or less usable form. Please consider my comment as a brainstorming and therefore feel free to add your own suggestions, ideas or share your concerns and objections. I would split up the task into several sub tasks: 1) Add support for multiple sinks into SinkRunner I would propose to make SinRunner aware of multiple sinks, which include: Constructor should accept array of sinks getSink, setSink should work with that array (and renamed to getSinks, setSinks) start, stop needs to work with all sinks 2) Change Sink configuration Current sink configuration is done on a precondition that one sink have exactly one runner. As we would like to incorporate failover mechanism into the Runner, it's clear that one runner can be used for multiple sinks at once. I believe that we need to incorporate such changes into configuration as well. I would propose following scheme: $host.sinks.$logicalName.runner For runner specific configuration (for example polling interval) $host.sinks.$logicalName.runner.selector Selector type (default, failover, loadbalancer) $host.sinks.$logicalName.1 First sink configuration for the runner $host.sinks.$logicalName.2 Second sink configuration for the runner Example of "normal" sink configuration would be: agent1.sinks.moo.runner.selector = default agent1.sinks.moo.runner.polling.interval = 10 agent1.sinks.moo.1.type = hdfs agent1.sinks.moo.1.hdfs.path = hdfs://namenode:8020/flume-ng/ Example of "failover" sink configuration: agent2.sinks.foo.runner.selector = failover agent2.sinks.foo.runner.failover.option1 = value agent2.sinks.foo.runner.polling.interval = 10 agent2.sinks.foo.1.type = hdfs agent2.sinks.foo.1.hdfs.path = hdfs://namenode:8020/flume-ng/ agent2.sinks.foo.2.type = hdfs agent2.sinks.foo.2.hdfs.path = hdfs://namenode-on-mars:8020/flume-ng/ I believe that this change should happen in PropertiesFileConfigurationProvider.loadSinks(). 3) Introduce SinkRunner selector Sink runner selector would be interface choosing which sink we should try. I could imagine something like public interface SinkSelector { int supportedSinks(); void setSinks(Sink[] availableSinks); Sink chooseSink(int try) throws Something; } Method supportedSinks() should return integer constant telling how many maximal sinks does it support. For example the default selector will support only 1 sink whereas failover selector might support Intereger.MAX_VALUE (or something more reasonable). Method SinkRunner.setSinks(Sink[]) should validate that number of passed sinks is less or equal to SinkSelector.supportedSinks(). Method setSinks() should be called once on load (or change) to notify Selector about all available sinks. Method chooseSink() should be called from SinkRunner.PollingRunner.run to resolve sink that should be used to call process() method. If this sink won't be able to deliver the event PollingRunner.run() should call SinkSelector.chooseSink() again with incremented argument "try" to resolve another sink to try. Method SinkSelector.chooseSink() should return null in case that there is no other sink to try (or if it do not want to try another sink). 4) Implement default, failover and balancer SinkSelector(s) I'm not expert on flume source code, so I would greatly appreciate any type of feedback. Again, please consider my comment just as a brainstorming. Jarcec
          Hide
          Juhani Connolly added a comment -

          Hi Jarcec, thanks for your comments.
          Overall it sounds reasonable, one of my sticking points is with communicating sink failure as well as recovering failed sinks:

          • I like the idea of changing it to multiple sinks and enforcing a limit for specific selectors
          • This configuration will invalidate existing configurations... Not that big a deal I guess, I had just tried to maintain it with my previous idea(and this one should result in a shorter config file)
          • We need a mechanism to let the selector know a sink has failed. From there the selector can mark it as dead, rebalancing if necessary. Other than that the interface looks good to me
          • My biggest concern is how we will test for sink recovery. Most sinks the only way to know if they are working is to try to attempt to send data through them
          Show
          Juhani Connolly added a comment - Hi Jarcec, thanks for your comments. Overall it sounds reasonable, one of my sticking points is with communicating sink failure as well as recovering failed sinks: I like the idea of changing it to multiple sinks and enforcing a limit for specific selectors This configuration will invalidate existing configurations... Not that big a deal I guess, I had just tried to maintain it with my previous idea(and this one should result in a shorter config file) We need a mechanism to let the selector know a sink has failed. From there the selector can mark it as dead, rebalancing if necessary. Other than that the interface looks good to me My biggest concern is how we will test for sink recovery. Most sinks the only way to know if they are working is to try to attempt to send data through them
          Hide
          Jarek Jarcec Cecho added a comment -

          Hi Juhani,
          thank you very much for your feedback. Let me clarify some thoughts that I wasn't able to describe enough yesterday night.

          - I like the idea of changing it to multiple sinks and enforcing a limit for specific selectors

          Thanks

          - This configuration will invalidate existing configurations... Not that big a deal I guess, I had just tried to maintain it with my previous idea(and this one should result in a shorter config file)

          Yes you're right. Changing this will invalidate all existing configurations. I believe that flume-ng is still considered in alpha state so such change might be accepted. However it would be nice if one of the committers would agree on that as well prior coding.

          - We need a mechanism to let the selector know a sink has failed. From there the selector can mark it as dead, rebalancing if necessary. Other than that the interface looks good to me.

          I have an implicit way of doing so in the design. If SinkRunner.chooseSink(int try) will be executed with parameter try > 1, it means that previously returned sink has failed and we need another working sink. Consider following example. Failover sink is keeping track of active sink. It will return this active sink for call chooseSink(1). However calling chooseSink(2) means that active sink is not working and we need move to another active sink. However I don't mind adding explicit method for marking some sink as dead. It was just an idea.

          - My biggest concern is how we will test for sink recovery. Most sinks the only way to know if they are working is to try to attempt to send data through them

          I also found out that most sinks do not return internal state, so that you have to try them to find actual state. That is actually the reason why I suggested to put a loop into SinkRunner.PollingRunner.run to keep executing SinkRunner.chooseSink(int try) until it returns null with increasing try parameter. This way selector can simply force Runner to try some previously dead sink and verify that is still dead.

          Jarcec

          Show
          Jarek Jarcec Cecho added a comment - Hi Juhani, thank you very much for your feedback. Let me clarify some thoughts that I wasn't able to describe enough yesterday night. - I like the idea of changing it to multiple sinks and enforcing a limit for specific selectors Thanks - This configuration will invalidate existing configurations... Not that big a deal I guess, I had just tried to maintain it with my previous idea(and this one should result in a shorter config file) Yes you're right. Changing this will invalidate all existing configurations. I believe that flume-ng is still considered in alpha state so such change might be accepted. However it would be nice if one of the committers would agree on that as well prior coding. - We need a mechanism to let the selector know a sink has failed. From there the selector can mark it as dead, rebalancing if necessary. Other than that the interface looks good to me. I have an implicit way of doing so in the design. If SinkRunner.chooseSink(int try) will be executed with parameter try > 1, it means that previously returned sink has failed and we need another working sink. Consider following example. Failover sink is keeping track of active sink. It will return this active sink for call chooseSink(1). However calling chooseSink(2) means that active sink is not working and we need move to another active sink. However I don't mind adding explicit method for marking some sink as dead. It was just an idea. - My biggest concern is how we will test for sink recovery. Most sinks the only way to know if they are working is to try to attempt to send data through them I also found out that most sinks do not return internal state, so that you have to try them to find actual state. That is actually the reason why I suggested to put a loop into SinkRunner.PollingRunner.run to keep executing SinkRunner.chooseSink(int try) until it returns null with increasing try parameter. This way selector can simply force Runner to try some previously dead sink and verify that is still dead. Jarcec
          Hide
          Juhani Connolly added a comment -

          bq - I have an implicit way of doing so in the design. If SinkRunner.chooseSink(int try) will be executed with parameter try > 1, it means that previously returned sink has failed and we need another working sink. Consider following example. Failover sink is keeping track of active sink. It will return this active sink for call chooseSink(1). However calling chooseSink(2) means that active sink is not working and we need move to another active sink. However I don't mind adding explicit method for marking some sink as dead. It was just an idea.

          This is workable but feels unwieldy, no need for the extra state imo, I think it is more transparent to give selector developers a function in the interface they need to implement.

          bq - I also found out that most sinks do not return internal state, so that you have to try them to find actual state. That is actually the reason why I suggested to put a loop into SinkRunner.PollingRunner.run to keep executing SinkRunner.chooseSink(int try) until it returns null with increasing try parameter. This way selector can simply force Runner to try some previously dead sink and verify that is still dead.

          I'm not sure if we can just keep beating on dead sinks every time we want to see if they're back yet... Some of them block for a short while trying to send a message, and I don't think we can just keep hammering them every failed message... What I would have liked to do is keep a list of dead sinks, and start up another thread that could periodically poll them for recovery. One possibility is that failed sinks should change their lifecycle state to stopped and that all sinks would be required to make some kind of liveliness check when starting. Right now even if a sink returns from start() without a problem, many of them can fail on the very first process

          One thing I am sure of is that right now sink implementations are inconsistent with one another, and there is no unified way of knowing when they have died(some of them never throw EventDeliveryException) or when they are working properly. I think any implementation of the selector will have to make some assumptions about their behavior and then that behavior will need to be enforced. For me right now those assumptions could be:

          • EventDeliveryException getting thrown signals failure
          • New status flag for sinks, or poll function, or return value on start
          Show
          Juhani Connolly added a comment - bq - I have an implicit way of doing so in the design. If SinkRunner.chooseSink(int try) will be executed with parameter try > 1, it means that previously returned sink has failed and we need another working sink. Consider following example. Failover sink is keeping track of active sink. It will return this active sink for call chooseSink(1). However calling chooseSink(2) means that active sink is not working and we need move to another active sink. However I don't mind adding explicit method for marking some sink as dead. It was just an idea. This is workable but feels unwieldy, no need for the extra state imo, I think it is more transparent to give selector developers a function in the interface they need to implement. bq - I also found out that most sinks do not return internal state, so that you have to try them to find actual state. That is actually the reason why I suggested to put a loop into SinkRunner.PollingRunner.run to keep executing SinkRunner.chooseSink(int try) until it returns null with increasing try parameter. This way selector can simply force Runner to try some previously dead sink and verify that is still dead. I'm not sure if we can just keep beating on dead sinks every time we want to see if they're back yet... Some of them block for a short while trying to send a message, and I don't think we can just keep hammering them every failed message... What I would have liked to do is keep a list of dead sinks, and start up another thread that could periodically poll them for recovery. One possibility is that failed sinks should change their lifecycle state to stopped and that all sinks would be required to make some kind of liveliness check when starting. Right now even if a sink returns from start() without a problem, many of them can fail on the very first process One thing I am sure of is that right now sink implementations are inconsistent with one another, and there is no unified way of knowing when they have died(some of them never throw EventDeliveryException) or when they are working properly. I think any implementation of the selector will have to make some assumptions about their behavior and then that behavior will need to be enforced. For me right now those assumptions could be: EventDeliveryException getting thrown signals failure New status flag for sinks, or poll function, or return value on start
          Hide
          Jarek Jarcec Cecho added a comment -

          This is workable but feels unwieldy, no need for the extra state imo, I think it is more transparent to give selector developers a function in the interface they need to implement.

          I tent to agree that having extra function for marking dead sink would make much more sense.

          I don't think we can just keep hammering them every failed message.

          I definitely agree here. Trying dead sinks with every message would kill the performance. I was more thinking about configuration for selector to try "dead sink" once a while (once a 15 minutes for example).

          What I would have liked to do is keep a list of dead sinks, and start up another thread that could periodically poll them for recovery.

          Seems as a good idea to me. However how do you want to resolve dead sinks? Are you going to simply try process() method and see what happens?

          One thing I am sure of is that right now sink implementations are inconsistent with one another.

          I would suggest to create separate JIRA for that and fix discrepancy in behavior prior creating selectors in SinkRunner.

          One possibility is that failed sinks should change their lifecycle state to stopped and that all sinks would be required to make some kind of liveliness check when starting.

          This again seems as a good idea how to deal with dead sinks. Your suggested thread that will keep an eye on dead sinks might then just simply call method start() to see if the sink will be able to start.

          Show
          Jarek Jarcec Cecho added a comment - This is workable but feels unwieldy, no need for the extra state imo, I think it is more transparent to give selector developers a function in the interface they need to implement. I tent to agree that having extra function for marking dead sink would make much more sense. I don't think we can just keep hammering them every failed message. I definitely agree here. Trying dead sinks with every message would kill the performance. I was more thinking about configuration for selector to try "dead sink" once a while (once a 15 minutes for example). What I would have liked to do is keep a list of dead sinks, and start up another thread that could periodically poll them for recovery. Seems as a good idea to me. However how do you want to resolve dead sinks? Are you going to simply try process() method and see what happens? One thing I am sure of is that right now sink implementations are inconsistent with one another. I would suggest to create separate JIRA for that and fix discrepancy in behavior prior creating selectors in SinkRunner. One possibility is that failed sinks should change their lifecycle state to stopped and that all sinks would be required to make some kind of liveliness check when starting. This again seems as a good idea how to deal with dead sinks. Your suggested thread that will keep an eye on dead sinks might then just simply call method start() to see if the sink will be able to start.
          Hide
          Juhani Connolly added a comment -

          Cool, I think we're getting at something workable... I'm out for the weekend now, but will keep an eye on this. Hopefully one of the committers can comment on it

          Show
          Juhani Connolly added a comment - Cool, I think we're getting at something workable... I'm out for the weekend now, but will keep an eye on this. Hopefully one of the committers can comment on it
          Hide
          Jarek Jarcec Cecho added a comment -

          I should have some time on Sunday, so if the approach will be approved by committers, I'll try to start working on it.

          Show
          Jarek Jarcec Cecho added a comment - I should have some time on Sunday, so if the approach will be approved by committers, I'll try to start working on it.
          Show
          Juhani Connolly added a comment - crosslinking relevant comment in 865 https://issues.apache.org/jira/browse/FLUME-865?focusedCommentId=13206622&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13206622 Looks like backwards compatibility is a target...
          Hide
          Arvind Prabhakar added a comment -

          Jarcec - based on the discussion in FLUME-865, the SinkRunner will remain as a concrete class and need not be made pluggable. With that, I think this issue becomes unnecessary. If you think I am missing something, please let me know.

          Show
          Arvind Prabhakar added a comment - Jarcec - based on the discussion in FLUME-865 , the SinkRunner will remain as a concrete class and need not be made pluggable. With that, I think this issue becomes unnecessary. If you think I am missing something, please let me know.
          Hide
          Jarek Jarcec Cecho added a comment -

          The discussion was moved to FLUME-865.

          Show
          Jarek Jarcec Cecho added a comment - The discussion was moved to FLUME-865 .

            People

            • Assignee:
              Unassigned
              Reporter:
              Juhani Connolly
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development