Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2190

add a source capable of feeding off of the Twitter Streaming API

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.4.0
    • Fix Version/s: 1.4.1
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      I would like to propose adding a source capable of connecting via Streaming API to the 1% sample twitter firehose, continously downloading tweets, converting them to Avro format and sending Avro events to a downstream Flume sink.

        Issue Links

          Activity

          Hide
          rvs Roman Shaposhnik added a comment -

          Attaching a patch heavily based on the code originally developed by Jonathan Natkins https://github.com/cloudera/cdh-twitter-example/blob/master/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java

          And since then hacked upon by wolfgang hoschek, Sean Mackrory and yours truly.

          Show
          rvs Roman Shaposhnik added a comment - Attaching a patch heavily based on the code originally developed by Jonathan Natkins https://github.com/cloudera/cdh-twitter-example/blob/master/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java And since then hacked upon by wolfgang hoschek , Sean Mackrory and yours truly.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Thanks Roman for the patch. Looks pretty interesting as a demo/example source. Could you please add a section in the Flume User Guide with details on how to use the source. Also, please mark this source as experimental (in the source as well as in the User Guide - use a ".. warning::" in the rst file to make it prominent). I looked at it, and have a few comments:

          • There are a bunch of unused imports. Please clean them up.
          • Please use the interface stability annotations to mark it as private and stable/unstable.
          • There are several lines > 80 chars long. Please make sure lines are <=80 chars.
          super.start();
          

          This should happen at the end of the start method, since this will change the lifecycle status of the component to tell the Flume framework that this component has started. Doing this at the beginning of the start method tells the framework that the component started successfully, even if the method actually throws later.

          docs = new ArrayList<Record>();
          

          This can be made final.

              System.out.println(status.getUser().getName() + " : " + status.getText());
          

          This should be removed or converted to a log statement with more details.

          // TODO: increment rawBytes?
          

          Do you want to do this or maybe remove the statement?

          private void addString(Record doc, String solr_field, String val) {
          

          This should probably just be called "field" or "avroField" rather than solr_field.

          • Is the onStatus method guaranteed to be called only from one thread? It does not seem like it is thread-safe, if it can get called from multiple threads.
          • The flush based on time works only if onStatus gets called (as in if the method does not get called for hours together, the data in the buffer will not get flushed to the channel, until the method is called at least once - it is unlikely that this will happen, but is worth noting).
              ByteArrayOutputStream out = new ByteArrayOutputStream();
              DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema);
              DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
              dataFileWriter.create(avroSchema, out);
              for (Record doc2 : docList) {
                dataFileWriter.append(doc2);
              }
              dataFileWriter.close();
          

          Can't all these fields be reused (or is there a thread-safety aspect to it?)? You could simply use the flush method from BAOS to make sure the internal buffer is flushed to the byte array.

          • Could you also annotate in the test that it runs only if the user has the relevant system properties set, and has twitter access. Maybe worth considering mocking the twitter classes.
          • Another improvement, maybe for the future is to make the serialization pluggable, so the user can plugin the serialization rather than force Avro (like the HTTP Source and Spool Dir Source)?

          Otherwise, looks good to go.

          Show
          hshreedharan Hari Shreedharan added a comment - Thanks Roman for the patch. Looks pretty interesting as a demo/example source. Could you please add a section in the Flume User Guide with details on how to use the source. Also, please mark this source as experimental (in the source as well as in the User Guide - use a ".. warning::" in the rst file to make it prominent). I looked at it, and have a few comments: There are a bunch of unused imports. Please clean them up. Please use the interface stability annotations to mark it as private and stable/unstable. There are several lines > 80 chars long. Please make sure lines are <=80 chars. super .start(); This should happen at the end of the start method, since this will change the lifecycle status of the component to tell the Flume framework that this component has started. Doing this at the beginning of the start method tells the framework that the component started successfully, even if the method actually throws later. docs = new ArrayList<Record>(); This can be made final. System .out.println(status.getUser().getName() + " : " + status.getText()); This should be removed or converted to a log statement with more details. // TODO: increment rawBytes? Do you want to do this or maybe remove the statement? private void addString(Record doc, String solr_field, String val) { This should probably just be called "field" or "avroField" rather than solr_field. Is the onStatus method guaranteed to be called only from one thread? It does not seem like it is thread-safe, if it can get called from multiple threads. The flush based on time works only if onStatus gets called (as in if the method does not get called for hours together, the data in the buffer will not get flushed to the channel, until the method is called at least once - it is unlikely that this will happen, but is worth noting). ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer); dataFileWriter.create(avroSchema, out); for (Record doc2 : docList) { dataFileWriter.append(doc2); } dataFileWriter.close(); Can't all these fields be reused (or is there a thread-safety aspect to it?)? You could simply use the flush method from BAOS to make sure the internal buffer is flushed to the byte array. Could you also annotate in the test that it runs only if the user has the relevant system properties set, and has twitter access. Maybe worth considering mocking the twitter classes. Another improvement, maybe for the future is to make the serialization pluggable, so the user can plugin the serialization rather than force Avro (like the HTTP Source and Spool Dir Source)? Otherwise, looks good to go.
          Hide
          rvs Roman Shaposhnik added a comment -

          Hari Shreedharan thanks for the excelent detailed feedback! I think I took care of most of it. Attached are two files FLUME-2190.patch.txt is a patch against trunk and 0001-FLUME-2190.-add-a-source-capable-of-feeding-off-of-t.patch is a diff between the previous and the current versions of the patch.

          Show
          rvs Roman Shaposhnik added a comment - Hari Shreedharan thanks for the excelent detailed feedback! I think I took care of most of it. Attached are two files FLUME-2190 .patch.txt is a patch against trunk and 0001- FLUME-2190 .-add-a-source-capable-of-feeding-off-of-t.patch is a diff between the previous and the current versions of the patch.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Thanks Roman Shaposhnik for the updated patch. So the onStatus method need not be thread safe as it is called only from one thread - is that correct?

          Show
          hshreedharan Hari Shreedharan added a comment - Thanks Roman Shaposhnik for the updated patch. So the onStatus method need not be thread safe as it is called only from one thread - is that correct?
          Hide
          rvs Roman Shaposhnik added a comment -

          Hari Shreedharan it seems to be that way yes. There's no firm guarantee in the docs if you're looking for that, so implementation of twitter4j is the only source of inspiration

          Show
          rvs Roman Shaposhnik added a comment - Hari Shreedharan it seems to be that way yes. There's no firm guarantee in the docs if you're looking for that, so implementation of twitter4j is the only source of inspiration
          Hide
          hshreedharan Hari Shreedharan added a comment -

          OK, lets go with this then.

          Show
          hshreedharan Hari Shreedharan added a comment - OK, lets go with this then.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          +1. Running tests and committing.

          Show
          hshreedharan Hari Shreedharan added a comment - +1. Running tests and committing.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Committed, rev: 0f4a66f. Thanks Roman!

          Show
          hshreedharan Hari Shreedharan added a comment - Committed, rev: 0f4a66f. Thanks Roman!
          Hide
          hudson Hudson added a comment -

          UNSTABLE: Integrated in flume-trunk #501 (See https://builds.apache.org/job/flume-trunk/501/)
          FLUME-2190. Add a source capable of feeding off of the Twitter Streaming API (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=0f4a66fb0f2946cd61dd8df31bd255fef7581cbc)

          • flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java
          • flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf
          • flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java
          • flume-ng-sources/flume-twitter-source/pom.xml
          • flume-ng-dist/pom.xml
          • flume-ng-sources/pom.xml
          • flume-ng-doc/sphinx/FlumeUserGuide.rst
          • pom.xml
          • flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties
          Show
          hudson Hudson added a comment - UNSTABLE: Integrated in flume-trunk #501 (See https://builds.apache.org/job/flume-trunk/501/ ) FLUME-2190 . Add a source capable of feeding off of the Twitter Streaming API (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=0f4a66fb0f2946cd61dd8df31bd255fef7581cbc ) flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java flume-ng-sources/flume-twitter-source/pom.xml flume-ng-dist/pom.xml flume-ng-sources/pom.xml flume-ng-doc/sphinx/FlumeUserGuide.rst pom.xml flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties

            People

            • Assignee:
              rvs Roman Shaposhnik
              Reporter:
              rvs Roman Shaposhnik
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development