Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13065

streaming-twitter pass twitter4j.FilterQuery argument to TwitterUtils.createStream()

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Duplicate
    • 1.6.0
    • None
    • DStreams
    • all

    Description

      The twitter stream api is very powerful provides a lot of support for twitter.com side filtering of status objects. When ever possible we want to let twitter do as much work as possible for us.

      currently the spark twitter api only allows you to configure a small sub set of possible filters

      String{} filters =

      {"tag1", tag2"}
      JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth, filters);

      The current implemenation does

      private[streaming]
      class TwitterReceiver(
      twitterAuth: Authorization,
      filters: Seq[String],
      storageLevel: StorageLevel
      ) extends Receiver[Status](storageLevel) with Logging {

      . . .


      val query = new FilterQuery
      if (filters.size > 0) { query.track(filters.mkString(",")) newTwitterStream.filter(query) } else { newTwitterStream.sample() }

      ...

      rather than construct the FilterQuery object in TwitterReceiver.onStart(). we should be able to pass a FilterQueryObject

      looks like an easy fix. See source code links bellow

      kind regards

      Andy

      https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60

      https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89


      $$$$$$$$$ 2/2/16
      attached is my java implementation for this problem. Feel free to reuse it how ever you like. In my streaming spark app main() I have the following code

      FilterQuery query = config.getFilterQuery().fetch();
      if (query != null) { // TODO https://issues.apache.org/jira/browse/SPARK-13065 tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth, query); } /*else
      spark native api
      String[] filters = {"tag1", tag2"}

      tweets = TwitterUtils.createStream(ssc, twitterAuth, filters);

      see https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89

      causes
      val query = new FilterQuery
      if (filters.size > 0)

      { query.track(filters.mkString(",")) newTwitterStream.filter(query) }

      */

      Attachments

        1. twitterFilterQueryPatch.tar.gz
          2 kB
          Andrew Davidson

        Issue Links

          Activity

            People

              Unassigned Unassigned
              aedwip Andrew Davidson
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 2h
                  2h
                  Remaining:
                  Remaining Estimate - 2h
                  2h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified