pyspark-notebook
Docker Stack¶pip install tweepy
starttweetstream.py
contains modified TweetListener
class from our Data Mining Twitter presentationkeys.py
containing Twitter credentialscd work
ipython starttweetstream.py number_of_tweets search_terms
ipython starttweetstream.py 1000 football
"Waiting for connection"
until Spark connects to begin streaming the tweetson_status
extracts hashtags, converts to lowercase and creates a space-separated string of hashtags to send to Spark connection
’s send
method to send string to whatever is reading from that socket send
expects as its argument a sequence of byteshashtags_string.encode('utf-8')
converts a string to bytes # starttweetstream.py
"""Script to get tweets on topic(s) specified as script argument(s)
and send tweet text to a socket for processing by Spark."""
import keys
import socket
import sys
import tweepy
class TweetListener(tweepy.StreamListener):
"""Handles incoming Tweet stream."""
def __init__(self, api, connection, limit=10000):
"""Create instance variables for tracking number of tweets."""
self.connection = connection
self.tweet_count = 0
self.TWEET_LIMIT = limit # 10,000 by default
super().__init__(api) # call superclass's init
def on_connect(self):
"""Called when your connection attempt is successful, enabling
you to perform appropriate application tasks at that point."""
print('Successfully connected to Twitter\n')
def on_status(self, status):
"""Called when Twitter pushes a new tweet to you."""
# get the hashtags
hashtags = []
for hashtag_dict in status.entities['hashtags']:
hashtags.append(hashtag_dict['text'].lower())
hashtags_string = ' '.join(hashtags) + '\n'
print(f'Screen name: {status.user.screen_name}:')
print(f' Hashtags: {hashtags_string}')
self.tweet_count += 1 # track number of tweets processed
try:
# send requires bytes, so encode the string in utf-8 format
self.connection.send(hashtags_string.encode('utf-8'))
except Exception as e:
print(f'Error: {e}')
# if TWEET_LIMIT is reached, return False to terminate streaming
return self.tweet_count < self.TWEET_LIMIT
def on_error(self, status):
print(status)
return True
if __name__ == '__main__':
tweet_limit = int(sys.argv[1]) # get maximum number of tweets
client_socket = socket.socket() # create a socket
# app will use localhost (this computer) port 9876
client_socket.bind(('localhost', 9876))
print('Waiting for connection')
client_socket.listen() # wait for client to connect
accept
the connection# when connection received, get connection/client address
connection, address = client_socket.accept()
print(f'Connection received from {address}')
# configure Twitter access
auth = tweepy.OAuthHandler(keys.consumer_key, keys.consumer_secret)
auth.set_access_token(keys.access_token, keys.access_token_secret)
# configure Tweepy to wait if Twitter rate limits are reached
api = tweepy.API(auth, wait_on_rate_limit=True,
wait_on_rate_limit_notify=True)
# create the Stream
twitter_stream = tweepy.Stream(api.auth,
TweetListener(api, connection, tweet_limit))
# sys.argv[2] is the first search term
twitter_stream.filter(track=sys.argv[2:])
close
method on the socket objects to release their resources connection.close()
client_socket.close()
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
DataFrame
to get a table view of the underlying RDDsSparkSession
is used to create a DataFrame
from an RDDSparkSession
per notebook or appSparkSession
instance if it already exists or to create one if it does not yet exist def getSparkSessionInstance(sparkConf):
"""Spark Streaming Programming Guide's recommended method
for getting an existing SparkSession or creating a new one."""
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
def display_barplot(spark_df, x, y, time, scale=1.0, size=(8, 4.5)):
"""Displays a Spark DataFrame's contents as a bar plot."""
df = spark_df.toPandas()
# remove prior graph when new one is ready to display
display.clear_output(wait=True)
print(f'TIME: {time}')
# create and configure a Figure containing a Seaborn barplot
plt.figure(figsize=size)
sns.set(font_scale=scale)
barplot = sns.barplot(data=df, x=x, y=y,
palette=sns.color_palette('cool', 20))
# rotate the x-axis labels 90 degrees for readability
for item in barplot.get_xticklabels():
item.set_rotation(90)
plt.tight_layout()
plt.show()
DStream
is a sequence of RDD
s DStream
represents mini-batch of data to processcount_tags
will SparkSession
)def count_tags(time, rdd):
"""Count hashtags and display top-20 in descending order."""
try:
# get SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# map hashtag string-count tuples to Rows
rows = rdd.map(
lambda tag: Row(hashtag=tag[0], total=tag[1]))
# create a DataFrame from the Row objects
hashtags_df = spark.createDataFrame(rows)
# create a temporary table view for use with Spark SQL
hashtags_df.createOrReplaceTempView('hashtags')
# use Spark SQL to get the top 20 hashtags in descending order
top20_df = spark.sql(
"""select hashtag, total
from hashtags
order by total desc, hashtag asc
limit 20""")
display_barplot(top20_df, x='hashtag', y='total', time=time)
except Exception as e:
print(f'Exception: {e}')
SparkSession
by calling getSparkSessionInstance
with the SparkContext
’s configuration informationSparkContext
via the context
map
the RDD
's data to Row
objectsRDD
s in this example contain tuples of hashtags and countsRow
constructor uses the keyword argument names as column namesDataFrame
containing the Row
objects for use with Spark SQLDataFrame
DataFrame
like a table in an relational databaseSparkSession
sql
method performs a query DataFrame
containing the resultsDataFrame
to display_barplot
utility functionSparkContext
¶starttweetstream.py
script and specifies how to process the tweetsSparkContext
sc = SparkContext()
StreamingContext
with following argumentsSparkContext
ssc = StreamingContext(sc, 10)
import time
ssc.checkpoint(f'checkpoint{time.time()}')
StreamingContext
method socketTextStream
connects to a socket to receive data stream DStream
that receives the datastream = ssc.socketTextStream('localhost', 9876)
DStream
to specify processing steps for streaming dataflatmap
space-separated lines of hashtags into new DStream
of individual hashtagstokenized = stream.flatMap(lambda line: line.split())
map
each hashtag to a hashtag-count tuple with an initial count 1
mapped = tokenized.map(lambda hashtag: (hashtag, 1))
updateStateByKey
receives a two-argument lambda
hashtag_counts = mapped.updateStateByKey(
lambda counts, prior_total: sum(counts) + (prior_total or 0))
foreachRDD
passes every processed RDD to function count_tags
, which summarizes top-20 hashtags so far in a barplothashtag_counts.foreachRDD(count_tags)
StreamingContext
’s start
method begins the streaming processssc.start() # start the Spark streaming
©1992–2020 by Pearson Education, Inc. All Rights Reserved. This content is based on Chapter 5 of the book Intro to Python for Computer Science and Data Science: Learning to Program with AI, Big Data and the Cloud.
DISCLAIMER: The authors and publisher of this book have used their best efforts in preparing the book. These efforts include the development, research, and testing of the theories and programs to determine their effectiveness. The authors and publisher make no warranty of any kind, expressed or implied, with regard to these programs or to the documentation contained in these books. The authors and publisher shall not be liable in any event for incidental or consequential damages in connection with, or arising out of, the furnishing, performance, or use of these programs.