Flume
  1. Flume
  2. FLUME-2209

AsyncHBaseSink will never recover if the column family does not exists for the first start

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: v1.4.0
    • Fix Version/s: v1.5.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      Hi,
      I am facing one issue. Initial analysis is as follows

      Using AsyncHBaseSink. During startup the configured column family does not exists and it is throwing exception as

      09 Oct 2013 14:45:56,691 ERROR [lifecycleSupervisor-1-2] (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:253)  - Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@5470be88 counterGroup:{ name:null counters:{} } } - Exception follows.
      org.apache.flume.FlumeException: Could not start sink. Table or column family does not exist in Hbase.
      	at org.apache.flume.sink.hbase.AsyncHBaseSink.start(AsyncHBaseSink.java:384)
      	at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
      	at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
      	at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
      	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:662)
      

      After this, the sink is died and LifecycleSupervisor is trying to start again

      Want to transition SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2d7aece8 counterGroup:{ name:null counters:{} } } from IDLE to START (failures:23)
      

      Here it is getting exception as

      09 Oct 2013 14:49:26,043 ERROR [lifecycleSupervisor-1-2] (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:253)  - Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2d7aece8 counterGroup:{ name:null counters:{} } } - Exception follows.
      java.lang.IllegalArgumentException: Please call stop before calling start on an old instance.
      	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:92)
      	at org.apache.flume.sink.hbase.AsyncHBaseSink.start(AsyncHBaseSink.java:344)
      	at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
      	at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
      	at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
      	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:662)
      

      The issue i think is we are not closing the client when the exception comes.

       if(fail.get()){
            sinkCounter.incrementConnectionFailedCount();
            throw new FlumeException(
                "Could not start sink. " +
                "Table or column family does not exist in Hbase.");
          } else {
            open = true;
          }
      

      is this a issue ? or am i missing something ?

      1. FLUME-2209-0.patch
        0.8 kB
        Ashish Paliwal

        Issue Links

          Activity

          Hide
          Hudson added a comment -

          FAILURE: Integrated in flume-trunk #524 (See https://builds.apache.org/job/flume-trunk/524/)
          FLUME-2209. AsyncHBaseSink will never recover if the column family does not exists for the first start. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=67454a71a3aba308ff0d1b29ad3f184e5c37fee2)

          • flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
          Show
          Hudson added a comment - FAILURE: Integrated in flume-trunk #524 (See https://builds.apache.org/job/flume-trunk/524/ ) FLUME-2209 . AsyncHBaseSink will never recover if the column family does not exists for the first start. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=67454a71a3aba308ff0d1b29ad3f184e5c37fee2 ) flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
          Hide
          Hari Shreedharan added a comment -

          Committed, rev: 67454a7. Thanks Ashish!

          Show
          Hari Shreedharan added a comment - Committed, rev: 67454a7. Thanks Ashish!
          Hide
          Hari Shreedharan added a comment -

          +1. Looks good. Committing.

          Show
          Hari Shreedharan added a comment - +1. Looks good. Committing.
          Hide
          nijel added a comment - - edited

          +1 for the fix.

          Show
          nijel added a comment - - edited +1 for the fix.
          Hide
          Ashish Paliwal added a comment -

          Hari Shreedharan Got a fix for the issue. With the fix, the Sink would not get in this hung state, and if table is created while its running, the Sink would start like HBaseSink. Let me know and I shall upload the patch and open a review request.

          Show
          Ashish Paliwal added a comment - Hari Shreedharan Got a fix for the issue. With the fix, the Sink would not get in this hung state, and if table is created while its running, the Sink would start like HBaseSink. Let me know and I shall upload the patch and open a review request.
          Hide
          Ashish Paliwal added a comment -

          Hari Shreedharan I simulated the scenario in my local box. HBaseSink and AsyncHBaseSink behave differently.

          If Table/Cf does not exists, both Sink throw an exception and get in loop. However, if we create the Table/Cf in HBase after seeing the exception, HBaseSink recovers, whereas AsyncHBaseSink doesn't recover, which is the problem specified in this JIRA.

          IMHO, the behavior of both the Sinks should be same. Thoughts?

          Show
          Ashish Paliwal added a comment - Hari Shreedharan I simulated the scenario in my local box. HBaseSink and AsyncHBaseSink behave differently. If Table/Cf does not exists, both Sink throw an exception and get in loop. However, if we create the Table/Cf in HBase after seeing the exception, HBaseSink recovers, whereas AsyncHBaseSink doesn't recover, which is the problem specified in this JIRA. IMHO, the behavior of both the Sinks should be same. Thoughts?
          Hide
          Ashish Paliwal added a comment -

          Hari Shreedharan I have really not thought of any fix yet, just want to play with HBase sink, as have not used it yet. The possible fix would be to retry a couple of times and then give up, not sure if killing the Agent would be a good idea or not. Other alternative is to mention in User Guide above the Precondition that Table and Cf must exist in HBase. Open for suggestions.

          Show
          Ashish Paliwal added a comment - Hari Shreedharan I have really not thought of any fix yet, just want to play with HBase sink, as have not used it yet. The possible fix would be to retry a couple of times and then give up, not sure if killing the Agent would be a good idea or not. Other alternative is to mention in User Guide above the Precondition that Table and Cf must exist in HBase. Open for suggestions.
          Hide
          Hari Shreedharan added a comment -

          I agree that it is a good to have fix. But, that is one of the assumptions that the sink makes - the table and cf are present when the sink starts. I don't really think this is a bug - this is user error.

          Show
          Hari Shreedharan added a comment - I agree that it is a good to have fix. But, that is one of the assumptions that the sink makes - the table and cf are present when the sink starts. I don't really think this is a bug - this is user error.
          Hide
          Ashish Paliwal added a comment -

          Wrote a test case to reproduce the same. Able to reproduce the error, but not able to reproduce the lifecycle error. Will try with a basic HBase setup to emulate the scenario and fix.

          Show
          Ashish Paliwal added a comment - Wrote a test case to reproduce the same. Able to reproduce the error, but not able to reproduce the lifecycle error. Will try with a basic HBase setup to emulate the scenario and fix.
          Hide
          nijel added a comment -

          Thought of closing the client when the exception comes

           if(fail.get()){
                sinkCounter.incrementConnectionFailedCount();
                if(null!= client)
                {
                  try
                  {
                    client.shutdown();
                  }
                  catch (Exception e) {
                    logger.warn("Exception occured when the client is getting shutdown. ", e);
                  }
                  client=null;
                }
                throw new FlumeException(
                    "Could not start sink. " +
                    "Table or column family does not exist in Hbase.");
              } else {
                open = true;
              }
          
          Show
          nijel added a comment - Thought of closing the client when the exception comes if(fail.get()){ sinkCounter.incrementConnectionFailedCount(); if(null!= client) { try { client.shutdown(); } catch (Exception e) { logger.warn("Exception occured when the client is getting shutdown. ", e); } client=null; } throw new FlumeException( "Could not start sink. " + "Table or column family does not exist in Hbase."); } else { open = true; }

            People

            • Assignee:
              Ashish Paliwal
              Reporter:
              nijel
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development