Twitter Trends using Spark Streaming

  • This tutorial shows how to process live tweets for Trend Analysis using Spark Python API
  • Spark 1.6 streaming API in python does not Twitter integration. This is only supported for Scala APIs
  • This tutorial provides a simple workaround. This is a sample program and not a production ready example.
  • This streaming example uses socketstream and listens to a TCP server, which integrates to twitter on it behalf and provided the tweets to this socket stream listener.

Create a Twitter Broker

  • The broker is a python client, which connect to twitter stream using tweepy library and forwards all tweets to the spark socket stream listener
  • Install tweepy library, if not installed already
    • pip install tweepy
  • Save the following file as TweetRead.py
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json

consumer_key = ''
consumer_secret = ''
access_token = ''
access_secret = ''

class TweetsListener(StreamListener):

  def __init__(self, csocket):
      self.client_socket = csocket

  def on_data(self, data):
      try:
          msg = json.loads( data )
          print( msg['text'].encode('utf-8') )
          self.client_socket.send( msg['text'].encode('utf-8') )
          return True
      except BaseException as e:
          print("Error on_data: %s" % str(e))
      return True

  def on_error(self, status):
      print(status)
      return True

def sendData(c_socket):
  auth = OAuthHandler(consumer_key, consumer_secret)
  auth.set_access_token(access_token, access_secret)

  twitter_stream = Stream(auth, TweetsListener(c_socket))
  twitter_stream.filter(track=['trump'])

if __name__ == "__main__":
  s = socket.socket()         # Create a socket object
  host = "192.168.0.139"      # Get local machine name
  port = 5555                 # Reserve a port for your service.
  s.bind((host, port))        # Bind to the port

  print("Listening on port: %s" % str(port))

  s.listen(5)                 # Now wait for client connection.
  c, addr = s.accept()        # Establish connection with client.

  print( "Received request from: " + str( addr ) )

  sendData( c )
  • Change the track=['ipl'] to appropriate search key word and change host variable to the machine ip address where the this program is going to run.
  • Run the program as
    • python TweetRead.py

Create a Spark Streaming Program

  • Creat a new ipython notebook spark program and write the following code

Import all spark streaming libraries

In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
In [2]:
sc
Out[2]:
<pyspark.context.SparkContext at 0x7f4c64d57390>

Create a spark streaming context

  • With a batch interval of 10 seconds. The messages would accumulate for 10 seconds and then get processed.
  • Creat an sqlContext, which will be used to query the trends from the results
  • The check point directory is going to save the messages to recover in case of streaming components fail.
In [3]:
ssc = StreamingContext(sc, 10 )
sqlContext = SQLContext(sc)
ssc.checkpoint( "file:///home/hadoop/lab/programs/trends/checkpoint")

Connect to the broker to recieve the tweets

  • This should be the IP address and port number of your windows or mac machine, where the program TweetRead.py is running
In [4]:
socket_stream = ssc.socketTextStream("100.86.241.236", 5555)

Create a window

  • The RDD will be created for every 10 seconds, but the data in RDD will be for the last 20 seconds
In [5]:
lines = socket_stream.window( 20 )

Create a namedtuple

  • To store the final tags and their counts as results
In [6]:
from collections import namedtuple
fields = ("tag", "count" )
Tweet = namedtuple( 'Tweet', fields )

Process the stream

  • It receives tweet messages, splits the messages into words, filters all the words which start with a hashtag(#), converts the words to lowercase, maps each tag to (word, 1), then reduces and counts occurances of each hash tag.
  • Finally it converts the resulting tags and their counts into a dataframe, sorts the tags in descending order and takes only the first 10 records and registers it as a temp table.
In [7]:
( lines.flatMap( lambda text: text.split( " " ) )
  .filter( lambda word: word.lower().startswith("#") )
  .map( lambda word: ( word.lower(), 1 ) )
  .reduceByKey( lambda a, b: a + b )
  .map( lambda rec: Tweet( rec[0], rec[1] ) )
  .foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") )
              .limit(10).registerTempTable("tweets") ) )
In [8]:
sqlContext
Out[8]:
<pyspark.sql.context.SQLContext at 0x7f4c4c65f9e8>

Start the streaming Process

In [9]:
ssc.start()             # Start the computation
In [10]:
import matplotlib.pyplot as plt
import seaborn as sn
%matplotlib inline
:0: FutureWarning: IPython widgets are experimental and may change in the future.
In [11]:
import time
from IPython import display


count = 0
while count < 10:
  time.sleep( 20 )
  top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' )
  top_10_df = top_10_tweets.toPandas()
  display.clear_output(wait=True)
  sn.plt.figure( figsize = ( 10, 8 ) )
  sn.barplot( x="count", y="tag", data=top_10_df)
  sn.plt.show()
  count = count + 1
In [ ]:
#ssc.stop()