Flume
  1. Flume
  2. FLUME-1150

Need a way to specify bucket in HDFSEventSink from client

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Won't Fix
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      We need a way to specify the bucket when event is streamed from client.

      For instance, if i have 5 different log categories (like category [1-5]), if i use single source->channel->hdfsSink then all the logs goes to one specified directory in HDFS (/logs/<destination>) but we need something like

      /logs/category1
      /category2
      /category3
      /category4
      /category5

      I can use multiplexing (1 source, N channel, and N sinks) but i need to use a multiplexing channel processor to route the events(categories) to the proper sink.

      What i am thinking is if i specify the category name in header in Event, HDFSEventSink (and its helper classes) can create a bucket based on header value (and then escape-sequence needs to be honored).

        Activity

        Hide
        Mubarak Seyed added a comment -

        Had a discussion with Mike Percy in IRC

        [16:47] <mpercy> maybe syntax like sinkName.hdfs.path = /logs/%

        {tableName|UNKNOWN_TABLE}

        /%Y%m%d/%H:%M:%S
        [16:48] <mpercy> where tableName is the key of a header
        [16:48] <mpercy> and "UNKNOWN_TABLE" is the fallback value
        [16:48] <mpercy> the rest is already implemented
        [16:48] <mpercy> in BucketPath
        [16:59] <mubarak_> ok, make sense
        [17:00] <mpercy> there is some logic in BucketPath that might already almost do this
        [17:00] <mubarak_> ok

        Show
        Mubarak Seyed added a comment - Had a discussion with Mike Percy in IRC [16:47] <mpercy> maybe syntax like sinkName.hdfs.path = /logs/% {tableName|UNKNOWN_TABLE} /%Y%m%d/%H:%M:%S [16:48] <mpercy> where tableName is the key of a header [16:48] <mpercy> and "UNKNOWN_TABLE" is the fallback value [16:48] <mpercy> the rest is already implemented [16:48] <mpercy> in BucketPath [16:59] <mubarak_> ok, make sense [17:00] <mpercy> there is some logic in BucketPath that might already almost do this [17:00] <mubarak_> ok
        Hide
        Mubarak Seyed added a comment - - edited

        If we dont change the

        TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}";
        

        then we can specify the pattern as

        sinkName.hdfs.path = /logs/%

        {tableName}/%Y%m%d/%H:%M:%S

        or

        sinkName.hdfs.path = /logs/%{tableName}

        and header should have key as "tableName" and its value (for example: headers.add("tableName","table1");

        if there is no key for "tableName" or value is null then we can fallback to "UNKNOWN" (so the path would be /logs/UNKNOWN

        if i specify pattern as /logs/%

        {tableName|UNKNOWN}

        then we need to change the TAX_REGEX

        I tested without specifying "|UNKNOWN" part, it works fine without any code change. Thanks.

        Show
        Mubarak Seyed added a comment - - edited If we dont change the TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}" ; then we can specify the pattern as sinkName.hdfs.path = /logs/% {tableName}/%Y%m%d/%H:%M:%S or sinkName.hdfs.path = /logs/%{tableName} and header should have key as "tableName" and its value (for example: headers.add("tableName","table1"); if there is no key for "tableName" or value is null then we can fallback to "UNKNOWN" (so the path would be /logs/UNKNOWN if i specify pattern as /logs/% {tableName|UNKNOWN} then we need to change the TAX_REGEX I tested without specifying "|UNKNOWN" part, it works fine without any code change. Thanks.
        Hide
        Mubarak Seyed added a comment -

        Heard that multiplexing channel selector will solve the problem.

        Show
        Mubarak Seyed added a comment - Heard that multiplexing channel selector will solve the problem.
        Hide
        Mike Percy added a comment -

        Thanks for checking on this Mubarak, that functionality is documented sort of wrong in the user guide so I wasn't sure if it worked as we were describing.

        What happens if the header is not set? Just uses an empty string?

        Re: Multiplexing selector, that can work too if you are ok with configuring a different sink per condition you want to handle.

        Show
        Mike Percy added a comment - Thanks for checking on this Mubarak, that functionality is documented sort of wrong in the user guide so I wasn't sure if it worked as we were describing. What happens if the header is not set? Just uses an empty string? Re: Multiplexing selector, that can work too if you are ok with configuring a different sink per condition you want to handle.
        Hide
        Mubarak Seyed added a comment -

        It is configuration related. We can use multiple channels or multiple HDFS sinks to create different buckets.

        Show
        Mubarak Seyed added a comment - It is configuration related. We can use multiple channels or multiple HDFS sinks to create different buckets.

          People

          • Assignee:
            Unassigned
            Reporter:
            Mubarak Seyed
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development