Flume
  1. Flume
  2. FLUME-1819

ExecSource don't flush the cache if there is no input entries

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: v1.3.0
    • Fix Version/s: v1.4.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      ExecSource has a default batchSize: 20, exec source read data from the source, then put it into the cache, after the cache is full, push it to the channel.

      but if exec source's cache is not full, and there isn't any input for a long time, then these entries always kept in the cache, there is no chance to the channel until the source's cache is full.

      so, the patch added a new config line: batchTimeout for ExecSource, and default is 3 seconds, if batchTimeout exceeded, push all cached data to the channel even the cache is not full.

      1. FLUME-1819.patch
        5 kB
        Fengdong Yu
      2. FLUME-1819.patch.1
        7 kB
        Venkatesh Sivasubramanian
      3. FLUME-1819.patch.2
        15 kB
        Venkatesh Sivasubramanian
      4. FLUME-1819-3.patch
        25 kB
        Venkatesh Sivasubramanian

        Activity

        Hide
        Hudson added a comment -

        Integrated in flume-trunk #398 (See https://builds.apache.org/job/flume-trunk/398/)
        FLUME-1819. ExecSource must flush events to channel periodically. (Revision 609a190e3f26462e540a0f25fdb56acd79af8ddc)

        Result = SUCCESS
        hshreedharan : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=609a190e3f26462e540a0f25fdb56acd79af8ddc
        Files :

        • flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
        • flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
        • flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
        Show
        Hudson added a comment - Integrated in flume-trunk #398 (See https://builds.apache.org/job/flume-trunk/398/ ) FLUME-1819 . ExecSource must flush events to channel periodically. (Revision 609a190e3f26462e540a0f25fdb56acd79af8ddc) Result = SUCCESS hshreedharan : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=609a190e3f26462e540a0f25fdb56acd79af8ddc Files : flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
        Hide
        Venkatesh Sivasubramanian added a comment -

        Cool, thanks Hari!

        On Thu, Apr 25, 2013 at 11:24 AM, Hari Shreedharan (JIRA)

        Show
        Venkatesh Sivasubramanian added a comment - Cool, thanks Hari! On Thu, Apr 25, 2013 at 11:24 AM, Hari Shreedharan (JIRA)
        Hide
        Hari Shreedharan added a comment -

        I filed FLUME-2024 to add the config option to the user guide

        Show
        Hari Shreedharan added a comment - I filed FLUME-2024 to add the config option to the user guide
        Hide
        Hari Shreedharan added a comment -

        Patch committed, rev: 609a190e3f26462e540a0f25fdb56acd79af8ddc. Thanks Venkatesh!

        Show
        Hari Shreedharan added a comment - Patch committed, rev: 609a190e3f26462e540a0f25fdb56acd79af8ddc. Thanks Venkatesh!
        Hide
        Hari Shreedharan added a comment -

        +1. Looks good. Running tests now.

        Show
        Hari Shreedharan added a comment - +1. Looks good. Running tests now.
        Hide
        Venkatesh Sivasubramanian added a comment -

        Hari: Good Catch! I have made those changes and updated the patch.

        On the unit test, have moved the sleep out of the loop and decreased the timeout to 750ms. Let me know if you have any other suggestions.

        Thanks!

        Show
        Venkatesh Sivasubramanian added a comment - Hari: Good Catch! I have made those changes and updated the patch. On the unit test, have moved the sleep out of the loop and decreased the timeout to 750ms. Let me know if you have any other suggestions. Thanks!
        Hide
        Hari Shreedharan added a comment -

        Venkatesh,

        Looks good. Some minor nits:

                            if(!eventList.isEmpty() &&
                                    (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout) {
        

        can be written as

                            if(!eventList.isEmpty() && timeout()) {
        

        Also, the following code repeats at least twice, and a 3rd time the first 2 lines occur (the 3rd & 4th line are harmless if executed at that time):

                              channelProcessor.processEventBatch(eventList);
                              sourceCounter.addToEventAcceptedCount(eventList.size());
                              eventList.clear();
                              lastPushToChannel = systemClock.currentTimeMillis();
        

        This can be replaced with a method.

        In the unit test,

        context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, "3000");
        

        This should be slightly less, else this test might be flakey. How about reducing this to 2500 or so?
        Also, in the test, we should probably insert 3 different events - this can be easily done by changing this to outputStream.write(String.valueOf(lineNumber).getBytes()); and then inserting a corresponding assert statements.

        Show
        Hari Shreedharan added a comment - Venkatesh, Looks good. Some minor nits: if (!eventList.isEmpty() && (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout) { can be written as if (!eventList.isEmpty() && timeout()) { Also, the following code repeats at least twice, and a 3rd time the first 2 lines occur (the 3rd & 4th line are harmless if executed at that time): channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); This can be replaced with a method. In the unit test, context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, "3000" ); This should be slightly less, else this test might be flakey. How about reducing this to 2500 or so? Also, in the test, we should probably insert 3 different events - this can be easily done by changing this to outputStream.write(String.valueOf(lineNumber).getBytes()); and then inserting a corresponding assert statements.
        Hide
        Venkatesh Sivasubramanian added a comment -

        Alright, there you go Hari! I have attached the patch in here (FLUME-1819-3.patch).

        Have made the new thread an inner class. The unit test case pretty much remains the same. Checkout the TestExecSource.testBatchTimeout().

        Show
        Venkatesh Sivasubramanian added a comment - Alright, there you go Hari! I have attached the patch in here ( FLUME-1819 -3.patch). Have made the new thread an inner class. The unit test case pretty much remains the same. Checkout the TestExecSource.testBatchTimeout().
        Hide
        Venkatesh Sivasubramanian added a comment -

        Hari: Sorry, just reading your new comments. Yes, noticed about the patch and was working on correcting it in parallel.

        Cool, let me update this and post back.

        Show
        Venkatesh Sivasubramanian added a comment - Hari: Sorry, just reading your new comments. Yes, noticed about the patch and was working on correcting it in parallel. Cool, let me update this and post back.
        Hide
        Hari Shreedharan added a comment -

        Also, could you add a unit test to test this case? Also, passing the Long "lastflush" does not update the original long when it is updated inside the runnable (yes, the weirdness of Java pass-by-value - that even for wrapper classes, the reference to the wrapper object is not what is passed) - you can make the runnable a private class so you can access the lastPush variable (there is no need for it to be static, it does not seem like it has any value without an exec source instance) - also, this way you don't need to pass in all of the other fields.

        Show
        Hari Shreedharan added a comment - Also, could you add a unit test to test this case? Also, passing the Long "lastflush" does not update the original long when it is updated inside the runnable (yes, the weirdness of Java pass-by-value - that even for wrapper classes, the reference to the wrapper object is not what is passed) - you can make the runnable a private class so you can access the lastPush variable (there is no need for it to be static, it does not seem like it has any value without an exec source instance) - also, this way you don't need to pass in all of the other fields.
        Hide
        Hari Shreedharan added a comment -

        Venkatesh,

        The patch is not applying cleanly on trunk. Could you please rebase? Seems like FLUME-2004 changed the source. Also could you please name the patches something like: "FLUME-1819-3.patch" - OS X seems to get confused with a "patch.3" extension.

        Thanks!

        Show
        Hari Shreedharan added a comment - Venkatesh, The patch is not applying cleanly on trunk. Could you please rebase? Seems like FLUME-2004 changed the source. Also could you please name the patches something like: " FLUME-1819 -3.patch" - OS X seems to get confused with a "patch.3" extension. Thanks!
        Hide
        Venkatesh Sivasubramanian added a comment -

        Hari: I have uploaded a revised patch with an additional thread that flushes the buffer periodically.

        Thanks!

        Show
        Venkatesh Sivasubramanian added a comment - Hari: I have uploaded a revised patch with an additional thread that flushes the buffer periodically. Thanks!
        Hide
        Venkatesh Sivasubramanian added a comment - - edited

        Yes Hari, let me take a stab. Will keep you posted. thanks!

        Show
        Venkatesh Sivasubramanian added a comment - - edited Yes Hari, let me take a stab. Will keep you posted. thanks!
        Hide
        Hari Shreedharan added a comment -

        Venkatesh,

        The patch generally looks good, but like you said, it will still wait for a new event to be available. You could have a 2nd thread to which the event list is passed which can check if the list is empty and if it is not it can flush it. This thread can be scheduled (using a scheduled executor service) after timeout seconds. You will need to be careful about locking while the events are being flushed (the main runnable and the flush runnable should not execute the flushing code simultaneously). In this case you could even remove the timeout() method. Does that make sense?

        Show
        Hari Shreedharan added a comment - Venkatesh, The patch generally looks good, but like you said, it will still wait for a new event to be available. You could have a 2nd thread to which the event list is passed which can check if the list is empty and if it is not it can flush it. This thread can be scheduled (using a scheduled executor service) after timeout seconds. You will need to be careful about locking while the events are being flushed (the main runnable and the flush runnable should not execute the flushing code simultaneously). In this case you could even remove the timeout() method. Does that make sense?
        Hide
        Venkatesh Sivasubramanian added a comment -

        Cool, thanks!

        Show
        Venkatesh Sivasubramanian added a comment - Cool, thanks!
        Hide
        Hari Shreedharan added a comment -

        That is fine, then. It should be ok to just attach the patch here

        Show
        Hari Shreedharan added a comment - That is fine, then. It should be ok to just attach the patch here
        Hide
        Venkatesh Sivasubramanian added a comment -

        Hari: Thanks for your response! I did try that and also tried all combinations from below. Am unfortunately still getting the same message.

        git diff > FLUME-1819.patch
        git diff --full-index trunk > FLUME-1819.patch
        git diff --no-prefix trunk > FLUME-1819.patch
        git format-patch trunk --no-prefix -stdout > FLUME-1819.patch

        Show
        Venkatesh Sivasubramanian added a comment - Hari: Thanks for your response! I did try that and also tried all combinations from below. Am unfortunately still getting the same message. git diff > FLUME-1819 .patch git diff --full-index trunk > FLUME-1819 .patch git diff --no-prefix trunk > FLUME-1819 .patch git format-patch trunk --no-prefix -stdout > FLUME-1819 .patch
        Hide
        Hari Shreedharan added a comment -

        Venketash,

        You can just use git diff > FLUME-1819.patch. That should work fine. Not sure why git format-patch isn't working though.

        Show
        Hari Shreedharan added a comment - Venketash, You can just use git diff > FLUME-1819 .patch. That should work fine. Not sure why git format-patch isn't working though.
        Hide
        Venkatesh Sivasubramanian added a comment -

        Hari: In the review board, when I try to upload the git diff, I am getting a message "No valid separator after the filename was found in the diff header". Do you know how I can resolve that? For now, I have uploaded the Patch as an attachment to the request (Review Request #10632).

        Btw, I used "git format-patch trunk --no-prefix --stdout > FLUME-1819.patch" to generate the Patch.

        Thanks
        Venkatesh

        Show
        Venkatesh Sivasubramanian added a comment - Hari: In the review board, when I try to upload the git diff, I am getting a message "No valid separator after the filename was found in the diff header". Do you know how I can resolve that? For now, I have uploaded the Patch as an attachment to the request (Review Request #10632). Btw, I used "git format-patch trunk --no-prefix --stdout > FLUME-1819 .patch" to generate the Patch. Thanks Venkatesh
        Hide
        Venkatesh Sivasubramanian added a comment -

        Hi Hari: Pls. find the Patch for this issue attached. I have also added a unit test case for testing the same. Pls. review and let me know if there are any suggestions/feedbacks. Additionally locally tested it with an actual agent as well.

        I will add this to the review board now as well.

        There is a known issue with this fix. Even after a timeout has occurred, it still expects one event to come before it can flush the contents in the buffer. That is because of the readLine() call we have, it waits till there is data in the InputStream. Changing this has larger implications, so I left it there. Pls. do let me know if you have any suggestions around this as well. Thank you!!

        Best Regards,
        Venkatesh

        Show
        Venkatesh Sivasubramanian added a comment - Hi Hari: Pls. find the Patch for this issue attached. I have also added a unit test case for testing the same. Pls. review and let me know if there are any suggestions/feedbacks. Additionally locally tested it with an actual agent as well. I will add this to the review board now as well. There is a known issue with this fix. Even after a timeout has occurred, it still expects one event to come before it can flush the contents in the buffer. That is because of the readLine() call we have, it waits till there is data in the InputStream. Changing this has larger implications, so I left it there. Pls. do let me know if you have any suggestions around this as well. Thank you!! Best Regards, Venkatesh
        Hide
        Venkatesh Sivasubramanian added a comment -

        Thanks Hari!

        Show
        Venkatesh Sivasubramanian added a comment - Thanks Hari!
        Hide
        Hari Shreedharan added a comment -
        Show
        Hari Shreedharan added a comment - Venkatesh Sivasubramanian Go for it!
        Hide
        Venkatesh Sivasubramanian added a comment -

        Hi Hari: We have a need for this fix. I would like to work on this and take it to completion, if that is fine with all. I can work on the FLUME-1837 issue as well in parallel.

        Pls. let me know.

        Show
        Venkatesh Sivasubramanian added a comment - Hi Hari: We have a need for this fix. I would like to work on this and take it to completion, if that is fine with all. I can work on the FLUME-1837 issue as well in parallel. Pls. let me know.
        Hide
        Hari Shreedharan added a comment -

        Hi Fengdong,

        I posted a review for this patch quite a while back on review board. Are you planning to continue working on this?

        Show
        Hari Shreedharan added a comment - Hi Fengdong, I posted a review for this patch quite a while back on review board. Are you planning to continue working on this?
        Hide
        André Stein added a comment -

        Done. FLUME-1837

        Show
        André Stein added a comment - Done. FLUME-1837
        Hide
        Fengdong Yu added a comment -

        Yes, Stein, you'd better open a new issue.

        Show
        Fengdong Yu added a comment - Yes, Stein, you'd better open a new issue.
        Hide
        André Stein added a comment -

        Hi there.

        I played around a bit with 1.3.1 and also found the behaviour this bug addresses to. Thanks for the patch.
        But I think there's another related issue to this: When stopping the application, it may happen that there are still events in the cache ("eventList") that are never flushed and therefore lost. Looking at the code strengthens this suspicion. Am I right and should I open a new issue for this behavior?

        Regards,

        André

        Show
        André Stein added a comment - Hi there. I played around a bit with 1.3.1 and also found the behaviour this bug addresses to. Thanks for the patch. But I think there's another related issue to this: When stopping the application, it may happen that there are still events in the cache ("eventList") that are never flushed and therefore lost. Looking at the code strengthens this suspicion. Am I right and should I open a new issue for this behavior? Regards, – André
        Hide
        Hari Shreedharan added a comment -

        I posted a review on review board.

        Show
        Hari Shreedharan added a comment - I posted a review on review board.
        Hide
        Fengdong Yu added a comment -

        Hari,

        I am not remove the charset support, Just because I generate patch is not correct. Patch updated now. Thanks.

        Show
        Fengdong Yu added a comment - Hari, I am not remove the charset support, Just because I generate patch is not correct. Patch updated now. Thanks.
        Hide
        Hari Shreedharan added a comment -

        The new code looks good, but why did you remove the charset support?

        Show
        Hari Shreedharan added a comment - The new code looks good, but why did you remove the charset support?

          People

          • Assignee:
            Venkatesh Sivasubramanian
            Reporter:
            Fengdong Yu
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development