Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: v1.4.0
    • Fix Version/s: v1.5.0
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      I'd like to add a flume sink for Kite (kitesdk.org) Datasets. This is an API for working with data in Hadoop, which can be backed by partitioned HDFS directories (with support that syncs with Hive) and HBase tables. This sink will depend on the Kite library and use it to write.

        Activity

        Hide
        rdblue Ryan Blue added a comment -

        I've pushed an initial implementation and welcome any comments for how it needs to be improved. Thanks!

        Show
        rdblue Ryan Blue added a comment - I've pushed an initial implementation and welcome any comments for how it needs to be improved. Thanks!
        Hide
        rdblue Ryan Blue added a comment -

        This patch adds a module under flume-ng-sinks called flume-dataset-sink.

        Show
        rdblue Ryan Blue added a comment - This patch adds a module under flume-ng-sinks called flume-dataset-sink.
        Hide
        hshreedharan Hari Shreedharan added a comment -

        Sorry, accidentally assigned to myself - thanks to jira's single key shortcuts.

        Show
        hshreedharan Hari Shreedharan added a comment - Sorry, accidentally assigned to myself - thanks to jira's single key shortcuts.
        Hide
        hshreedharan Hari Shreedharan added a comment -

        Looks like this depends on an artifact that is not available:

        [ERROR] Failed to execute goal on project flume-dataset-sink: Could not resolve dependencies for project org.apache.flume.flume-ng-sinks:flume-dataset-sink:jar:1.5.0-SNAPSHOT: Could not find artifact org.kitesdk:kite-data-core:jar:0.10.1 in repo1.maven.org (http://repo1.maven.org/maven2) -> [Help 1]
        [ERROR] 
        [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
        [ERROR] Re-run Maven using the -X switch to enable full debug logging.
        [ERROR] 
        [ERROR] For more information about the errors and possible solutions, please read the following articles:
        [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
        [ERROR] 
        [ERROR] After correcting the problems, you can resume the build with the command
        
        

        Intellij complains about the Dataset* classes missing - probably as a result of the above error.

        Show
        hshreedharan Hari Shreedharan added a comment - Looks like this depends on an artifact that is not available: [ERROR] Failed to execute goal on project flume-dataset-sink: Could not resolve dependencies for project org.apache.flume.flume-ng-sinks:flume-dataset-sink:jar:1.5.0-SNAPSHOT: Could not find artifact org.kitesdk:kite-data-core:jar:0.10.1 in repo1.maven.org (http: //repo1.maven.org/maven2) -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch . [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http: //cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command Intellij complains about the Dataset* classes missing - probably as a result of the above error.
        Hide
        rdblue Ryan Blue added a comment -

        It looks like the Cloudera repository, which hosts all the kite jars, needs to be added to the pom. Here's the code:

          <repositories>
            <repository>
              <id>cdh.repo</id>
              <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
              <name>Cloudera Repositories</name>
              <snapshots>
                <enabled>false</enabled>
              </snapshots>
            </repository>
              
            <repository>
              <id>cdh.snapshots.repo</id>
              <url>https://repository.cloudera.com/artifactory/libs-snapshot-local</url>
              <name>Cloudera Snapshots Repository</name>
              <snapshots>
                <enabled>true</enabled>
              </snapshots>
              <releases>
                <enabled>false</enabled>
              </releases>
            </repository>
          </repositories>
        
        Show
        rdblue Ryan Blue added a comment - It looks like the Cloudera repository, which hosts all the kite jars, needs to be added to the pom. Here's the code: <repositories> <repository> <id>cdh.repo</id> <url>https: //repository.cloudera.com/artifactory/cloudera-repos</url> <name>Cloudera Repositories</name> <snapshots> <enabled> false </enabled> </snapshots> </repository> <repository> <id>cdh.snapshots.repo</id> <url>https: //repository.cloudera.com/artifactory/libs-snapshot-local</url> <name>Cloudera Snapshots Repository</name> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> false </enabled> </releases> </repository> </repositories>
        Hide
        rdblue Ryan Blue added a comment -

        Updated patch for kite artifact repositories

        Show
        rdblue Ryan Blue added a comment - Updated patch for kite artifact repositories
        Hide
        hshreedharan Hari Shreedharan added a comment -

        This patch looks good. I have a few comments:

        • There are a bunch of lines > 80 chars. Please make sure all lines are < 80 chars.
        • There is at least 1 "FIXME". That probably should be fixed
        • Nit: In general, most flume components put their configuration parameter names and defaults in separate classes as static finals (see HBaseSink/FileChannel etc). Only HDFS sink being older and more difficult to refactor keeps it in the same class. Also we usually name them as CONFIG_HDFS_URI etc.
        • super.start() should be called at the end of the start method, since this tells the framework that the sink was successfully started and the framework will not try to start it again. But if the start method throws, it must retry and if this method is called first the framework would not.
          new ThreadFactoryBuilder().setNameFormat(getName() + "")
          
        • Is the + "" needed? I think it should be called something like getName() + "-timed-roll-thread" or something like that.
        • nit: Scheduling the roll timer scheduleWithFixedDelay might be better than using scheduleAtFixedRate.
        • Looks like the rolling is not proactive. The rolling is done only if another event comes after a scheduled delay. This can mean that the file may not be rolled for quite a while (or even never, if no data is written to that partition again). This affects data visibility as other processes may not see the data till the file is closed (or might skip it since the file would still be open for write). We probably should make it more proactive.
        • channel.getTransaction() and transaction.begin() should also be inside the try (since a transaction that may have been opened should be closed).
        • Unfortunately, even transaction.rollback() can throw (for example in file channel if there is no disk space etc). This needs to be wrapped in try-catch with a error mesg logged in the catch block.
        • Could you also add a test where the data goes into different directories using Kite's partitioning mechanisms?
        • There seems to be no documentation on how to use the sink and what headers are required. Could you please add detailed docs and also describe the config params? Also, since this is a new component please mark it is as experimental in the docs (see the twitter source as an example).

        Thanks Ryan!

        Show
        hshreedharan Hari Shreedharan added a comment - This patch looks good. I have a few comments: There are a bunch of lines > 80 chars. Please make sure all lines are < 80 chars. There is at least 1 "FIXME". That probably should be fixed Nit: In general, most flume components put their configuration parameter names and defaults in separate classes as static finals (see HBaseSink/FileChannel etc). Only HDFS sink being older and more difficult to refactor keeps it in the same class. Also we usually name them as CONFIG_HDFS_URI etc. super.start() should be called at the end of the start method, since this tells the framework that the sink was successfully started and the framework will not try to start it again. But if the start method throws, it must retry and if this method is called first the framework would not. new ThreadFactoryBuilder().setNameFormat(getName() + "") Is the + "" needed? I think it should be called something like getName() + "-timed-roll-thread" or something like that. nit: Scheduling the roll timer scheduleWithFixedDelay might be better than using scheduleAtFixedRate. Looks like the rolling is not proactive. The rolling is done only if another event comes after a scheduled delay. This can mean that the file may not be rolled for quite a while (or even never, if no data is written to that partition again). This affects data visibility as other processes may not see the data till the file is closed (or might skip it since the file would still be open for write). We probably should make it more proactive. channel.getTransaction() and transaction.begin() should also be inside the try (since a transaction that may have been opened should be closed). Unfortunately, even transaction.rollback() can throw (for example in file channel if there is no disk space etc). This needs to be wrapped in try-catch with a error mesg logged in the catch block. Could you also add a test where the data goes into different directories using Kite's partitioning mechanisms? There seems to be no documentation on how to use the sink and what headers are required. Could you please add detailed docs and also describe the config params? Also, since this is a new component please mark it is as experimental in the docs (see the twitter source as an example). Thanks Ryan!
        Hide
        rdblue Ryan Blue added a comment -

        The "FIXME" comment is about how to get a Hadoop Configuration. How is this normally done? The HDFS sink that I was basing this on called new Configuration() – is that the recommended solution?

        Show
        rdblue Ryan Blue added a comment - The "FIXME" comment is about how to get a Hadoop Configuration . How is this normally done? The HDFS sink that I was basing this on called new Configuration() – is that the recommended solution?
        Hide
        hshreedharan Hari Shreedharan added a comment -

        I would assume that is fine, since the hdfs path is picked up from flume configuration - so it works even if hdfs-site.xml is not on classpath too.

        Also can you add a test that uses HDFS minicluster?

        Show
        hshreedharan Hari Shreedharan added a comment - I would assume that is fine, since the hdfs path is picked up from flume configuration - so it works even if hdfs-site.xml is not on classpath too. Also can you add a test that uses HDFS minicluster?
        Hide
        rdblue Ryan Blue added a comment -

        Here is a new patch with the requested changes, except documentation updates. In order to add proactive file rolling, I've introduced a lock for the writer. I'm posting this diff to get started on reviewing that addition.

        Show
        rdblue Ryan Blue added a comment - Here is a new patch with the requested changes, except documentation updates. In order to add proactive file rolling, I've introduced a lock for the writer. I'm posting this diff to get started on reviewing that addition.
        Hide
        rdblue Ryan Blue added a comment -

        Patch with code and documentation updates.

        Show
        rdblue Ryan Blue added a comment - Patch with code and documentation updates.
        Hide
        hshreedharan Hari Shreedharan added a comment -

        This looks ready to go. I have one minor request - could you put in a link to the Kite docs/tutorial in the documentation?

        Show
        hshreedharan Hari Shreedharan added a comment - This looks ready to go. I have one minor request - could you put in a link to the Kite docs/tutorial in the documentation?
        Hide
        hshreedharan Hari Shreedharan added a comment -

        .. and also that it works only against Hadoop 2.x?

        Show
        hshreedharan Hari Shreedharan added a comment - .. and also that it works only against Hadoop 2.x?
        Hide
        rdblue Ryan Blue added a comment -

        Patch with a link to the Kite reference guide.

        Show
        rdblue Ryan Blue added a comment - Patch with a link to the Kite reference guide.
        Hide
        rdblue Ryan Blue added a comment -

        Yes, it only works against Hadoop 2.x because Kite calls hflush. We're adding support for 1.x and I'll post a patch to use it when the Kite support is released. In the mean time, I've updated the build so that the module doesn't appear in the hadoop-1 builds.

        Show
        rdblue Ryan Blue added a comment - Yes, it only works against Hadoop 2.x because Kite calls hflush. We're adding support for 1.x and I'll post a patch to use it when the Kite support is released. In the mean time, I've updated the build so that the module doesn't appear in the hadoop-1 builds.
        Hide
        paliwalashish Ashish Paliwal added a comment -

        Ryan Blue We follow a different patch naming convention FLUME-<JIRANUM>-<VERSION>.patch, like FLUME-2294-4.patch

        For subsequent patch submission, would be great if you can update the naming convention.

        Show
        paliwalashish Ashish Paliwal added a comment - Ryan Blue We follow a different patch naming convention FLUME-<JIRANUM>-<VERSION>.patch, like FLUME-2294 -4.patch For subsequent patch submission, would be great if you can update the naming convention.
        Hide
        rdblue Ryan Blue added a comment -

        Ashish: will do in the future. Thanks for letting me know.

        Show
        rdblue Ryan Blue added a comment - Ashish: will do in the future. Thanks for letting me know.
        Hide
        hshreedharan Hari Shreedharan added a comment -

        Sorry for dragging this on. But it looks like there are a couple places where you need to add the new module as a dependency - the top level pom and the pom of the dist module. If you don't add these, the sources/jars will not get shipped with the distribution.

        Show
        hshreedharan Hari Shreedharan added a comment - Sorry for dragging this on. But it looks like there are a couple places where you need to add the new module as a dependency - the top level pom and the pom of the dist module. If you don't add these, the sources/jars will not get shipped with the distribution.
        Hide
        hshreedharan Hari Shreedharan added a comment -

        Also DatasetSinkConstants.java is missing the ASLv2 license header.

        Show
        hshreedharan Hari Shreedharan added a comment - Also DatasetSinkConstants.java is missing the ASLv2 license header.
        Hide
        rdblue Ryan Blue added a comment -

        What do I need to do in the pom? I've added the module in the flume-ng-sinks pom, but it's in a profile because Kite is only compatible with hadoop-2 until next release.

            <profile>
              <id>hadoop-2</id>
              <activation>
                <property>
                  <name>hadoop.profile</name>
                  <value>2</value>
                </property>
              </activation>
              <!-- add the flume-dataset-sink, which is only compatible with hadoop-2
                   -->
              <modules>
                <module>flume-dataset-sink</module>
              </modules>
            </profile>
        
        Show
        rdblue Ryan Blue added a comment - What do I need to do in the pom? I've added the module in the flume-ng-sinks pom, but it's in a profile because Kite is only compatible with hadoop-2 until next release. <profile> <id> hadoop-2 </id> <activation> <property> <name> hadoop.profile </name> <value> 2 </value> </property> </activation> <!-- add the flume-dataset-sink, which is only compatible with hadoop-2 --> <modules> <module> flume-dataset-sink </module> </modules> </profile>
        Hide
        hshreedharan Hari Shreedharan added a comment -

        +1. This looks good. I am committing this after making some minor fixes:

        • Added the dataset sink to the dist module (and corrected the top-level pom's dataset sink dependency)
        • Modified the test class to delete the test directory at the end of the execution.
        Show
        hshreedharan Hari Shreedharan added a comment - +1. This looks good. I am committing this after making some minor fixes: Added the dataset sink to the dist module (and corrected the top-level pom's dataset sink dependency) Modified the test class to delete the test directory at the end of the execution.
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 68ba5cf7185f333ad8723c3af5bcefe868c783cd in branch refs/heads/trunk from Hari Shreedharan
        [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=68ba5cf ]

        FLUME-2294. Add a sink for Kite Datasets.

        (Ryan Blue via Hari Shreedharan)

        Show
        jira-bot ASF subversion and git services added a comment - Commit 68ba5cf7185f333ad8723c3af5bcefe868c783cd in branch refs/heads/trunk from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=68ba5cf ] FLUME-2294 . Add a sink for Kite Datasets. (Ryan Blue via Hari Shreedharan)
        Hide
        jira-bot ASF subversion and git services added a comment -

        Commit 0994eb27a91eb7ea8996acf5e8938b3ee79fbc30 in branch refs/heads/flume-1.5 from Hari Shreedharan
        [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=0994eb2 ]

        FLUME-2294. Add a sink for Kite Datasets.

        (Ryan Blue via Hari Shreedharan)

        Show
        jira-bot ASF subversion and git services added a comment - Commit 0994eb27a91eb7ea8996acf5e8938b3ee79fbc30 in branch refs/heads/flume-1.5 from Hari Shreedharan [ https://git-wip-us.apache.org/repos/asf?p=flume.git;h=0994eb2 ] FLUME-2294 . Add a sink for Kite Datasets. (Ryan Blue via Hari Shreedharan)
        Hide
        hshreedharan Hari Shreedharan added a comment -

        Committed! Thanks Ryan!

        Show
        hshreedharan Hari Shreedharan added a comment - Committed! Thanks Ryan!
        Hide
        hudson Hudson added a comment -

        SUCCESS: Integrated in flume-trunk #541 (See https://builds.apache.org/job/flume-trunk/541/)
        FLUME-2294. Add a sink for Kite Datasets. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=68ba5cf7185f333ad8723c3af5bcefe868c783cd)

        • flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java
        • flume-ng-dist/pom.xml
        • flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java
        • flume-ng-sinks/pom.xml
        • flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
        • pom.xml
        • flume-ng-sinks/flume-dataset-sink/pom.xml
        • flume-ng-doc/sphinx/FlumeUserGuide.rst
        Show
        hudson Hudson added a comment - SUCCESS: Integrated in flume-trunk #541 (See https://builds.apache.org/job/flume-trunk/541/ ) FLUME-2294 . Add a sink for Kite Datasets. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=68ba5cf7185f333ad8723c3af5bcefe868c783cd ) flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java flume-ng-dist/pom.xml flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java flume-ng-sinks/pom.xml flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java pom.xml flume-ng-sinks/flume-dataset-sink/pom.xml flume-ng-doc/sphinx/FlumeUserGuide.rst

          People

          • Assignee:
            rdblue Ryan Blue
            Reporter:
            rdblue Ryan Blue
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development