Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13065

streaming-twitter pass twitter4j.FilterQuery argument to TwitterUtils.createStream()

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Duplicate
    • Affects Version/s: 1.6.0
    • Fix Version/s: None
    • Component/s: DStreams
    • Labels:
    • Environment:

      all

      Description

      The twitter stream api is very powerful provides a lot of support for twitter.com side filtering of status objects. When ever possible we want to let twitter do as much work as possible for us.

      currently the spark twitter api only allows you to configure a small sub set of possible filters

      String{} filters =

      {"tag1", tag2"}
      JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth, filters);

      The current implemenation does

      private[streaming]
      class TwitterReceiver(
      twitterAuth: Authorization,
      filters: Seq[String],
      storageLevel: StorageLevel
      ) extends Receiver[Status](storageLevel) with Logging {

      . . .


      val query = new FilterQuery
      if (filters.size > 0) { query.track(filters.mkString(",")) newTwitterStream.filter(query) } else { newTwitterStream.sample() }

      ...

      rather than construct the FilterQuery object in TwitterReceiver.onStart(). we should be able to pass a FilterQueryObject

      looks like an easy fix. See source code links bellow

      kind regards

      Andy

      https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60

      https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89


      $$$$$$$$$ 2/2/16
      attached is my java implementation for this problem. Feel free to reuse it how ever you like. In my streaming spark app main() I have the following code

      FilterQuery query = config.getFilterQuery().fetch();
      if (query != null) { // TODO https://issues.apache.org/jira/browse/SPARK-13065 tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth, query); } /*else
      spark native api
      String[] filters = {"tag1", tag2"}

      tweets = TwitterUtils.createStream(ssc, twitterAuth, filters);

      see https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89

      causes
      val query = new FilterQuery
      if (filters.size > 0)

      { query.track(filters.mkString(",")) newTwitterStream.filter(query) }

      */

        Attachments

        1. twitterFilterQueryPatch.tar.gz
          2 kB
          Andrew Davidson

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                aedwip Andrew Davidson
              • Votes:
                1 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - 2h
                  2h
                  Remaining:
                  Remaining Estimate - 2h
                  2h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified