Instructor Notes:

  • This notebook should be executed in the jupyter/pyspark-notebook Docker stack, which is configured with Spark and PySpark
  • The notes on doing that are provided in this notebook and in the book chapter
In [1]:
# enable high-res images in notebook 
%config InlineBackend.figure_format = 'retina'

17.6 Spark

  • Use PySpark and Spark functional-style programming to summarize word frequencies in Romeo and Juliet
  • Hadoop break tasks into pieces that do lots of disk I/O across many computers
  • Spark performs certain big-data tasks in memory for better performance

17.6.1 Spark Overview

  • In big data, performance is crucial
  • Hadoop is geared to disk-based batch processing
    • read data from disk
    • process the data
    • write results back to disk
  • Many big-data applications demand better performance
    • E.g., fast streaming applications requiring real-time or near-real-time processing won’t work in a disk-based architecture

Architecture and Components

  • Its in-memory architecture “has been used to sort 100 TB of data 3X faster than Hadoop MapReduce on 1/10th of the machines[2]
  • Runs some workloads up to 100 times faster than Hadoop [3]
  • Spark uses resilient distributed datasets (RDDs) to process distributed data with functional-style programming
  • Hadoop uses replication for fault tolerance — adds more disk-based overhead
  • RDDs eliminate disk-based overhead by
    • remaining in memory — use disk only if data can't fit in memory
    • not replicating data
  • Fault tolerance — Spark remembers steps used to create an RDD
    • If a node fails, Spark rebuilds the RDD [1]

Architecture and Components (cont.)

  • Spark distributes operations to a cluster’s nodes for parallel execution
  • Spark streaming enables you to process data as it’s received
  • Spark DataFrames (similar to pandas DataFrames), enable you to manipulate RDDs as a collection of named columns
  • Can use Spark DataFrames with Spark SQL to query distributed data
  • Spark MLlib (the Spark Machine Learning Library) enables you to perform machine-learning algorithms on distributed data

Providers

  • Hadoop providers typicalluy also provide Spark support
  • Databricks
    • A Spark-specific vendor
    • Their website is an excellent resource for learning Spark
    • Paid version runs on Amazon AWS or Microsoft Azure
    • Free Databricks Community Edition is a great way to get started with both Spark and the Databricks environment
    • Databricks free e-books

17.6.2 Docker and the Jupyter Docker Stacks

Docker

  • Docker is a tool for packaging software into containers (also called images) that bundle everything required to execute that software across platforms.
  • Some software packages we use in this chapter require complicated setup and configuration.
  • For many of these, there are preexisting Docker containers that you can download for free and execute locally on your desktop or notebook computers.
  • These help you get started with new technologies quickly and conveniently.

Docker (cont.)

  • Docker also helps with reproducibility in research and analytics studies.
  • You can create custom Docker containers that are configured with the versions of every piece of software and every library you used in your study.
  • This would enable others to recreate the environment you used, then reproduce your work, and will help you reproduce your results at a later time.
  • We’ll use Docker in this section to download and execute a Docker container that’s preconfigured to run Spark applications.

Installing Docker

Jupyter Docker Stacks

  • The Jupyter Notebooks team has preconfigured several Jupyter “Docker stacks” containers for common Python development scenarios.
  • Each enables you to use Jupyter Notebooks to experiment with powerful capabilities without having to worry about complex software setup issues.
  • In each case, you can open JupyterLab in your web browser, open a notebook in JupyterLab and start coding.
  • JupyterLab also provides a Terminal window that you can use in your browser like your computer’s Terminal, Anaconda Command Prompt or shell.

Jupyter Docker Stacks (cont.)

  • We’ll use the jupyter/pyspark-notebook Docker stack, which is preconfigured with everything you need to create and test Apache Spark apps on your computer.
  • When combined with installing other Python libraries we’ve used throughout the book, you can implement most of this book’s examples using this container.
  • More about the available Docker stacks

Run Jupyter Docker Stack

  • Before performing the next step, ensure that JupyterLab is not currently running on your computer.
  • Let’s download and run the jupyter/pyspark-notebook Docker stack.
  • To ensure that you do not lose your work when you close the Docker container, we’ll attach a local file-system folder to the container and use it to save your notebook—Windows users should replace \ with ^
    • Note: You should replace "fullPathToTheFolderYouWantToUse" with the actual full path of a folder on your system—in this case, the one containing the ch17 examples
      docker run -p 8888:8888 -p 4040:4040 -it --user root \
      -v fullPathToTheFolderYouWantToUse:/home/jovyan/work \
      jupyter/pyspark-notebook:14fdfbf9cfc1 start.sh jupyter lab

Run Jupyter Docker Stack (1 of 2)

  • The first time you run the preceding command, Docker will download the Docker container named:
    jupyter/pyspark-notebook:14fdfbf9cfc1
  • The notation ":14fdfbf9cfc1" indicates the specific jupyter/pyspark-notebook container to download.
  • At the time of this writing, 14fdfbf9cfc1 was the newest version of the container.
  • Specifying the version as we did here helps with reproducibility.
  • Without the ":14fdfbf9cfc1" in the command, Docker will download the latest version of the container, which might contain different software versions and might not be compatible with the code you’re trying to execute.
  • The Docker container is nearly 6GB, so the initial download time will depend on your Internet connection’s speed.

Accessing the Docker Container’s Command Line (1 of 2)

  • Each Docker container has a command-line interface like the one you’ve used to run IPython throughout this book.
  • Via this interface, you can install Python packages into the Docker container and even use IPython as you’ve done previously.
  • Open a separate Anaconda Command Prompt, Terminal or shell and list the currently running Docker containers with the command:
    docker ps
  • The output of this command is wide, so the lines of text will likely wrap, as in:
CONTAINER ID        IMAGE                                   COMMAND  
           CREATED             STATUS            PORTS             
  NAMES
f54f62b7e6d5        jupyter/pyspark-notebook:14fdfbf9cfc1   "tini -g -- 
/bin/bash"  2 minutes ago      Up 2 minutes      0.0.0.0:8888->8888/tcp
  friendly_pascal

Accessing the Docker Container’s Command Line (2 of 2)

  • In the last line of our system’s output under the column head NAMES in the third line is the name that Docker randomly assigned to the running containerfriendly_pascalthe name on your system will differ
  • To access the container’s command line, execute the following command, replacing container_name with the running container’s name:
    docker exec -it container_name /bin/bash
  • The Docker container uses Linux under the hood, so you’ll see a Linux prompt where you can enter commands.
  • The app in this section will use features of the NLTK and TextBlob libraries you used in the “Natural Language Processing” chapter.
  • Neither is preinstalled in the Jupyter Docker stacks.
  • To install NLTK and TextBlob enter the command:
    conda install -c conda-forge nltk textblob

Stopping and Restarting a Docker Container

  • Every time you start a container with docker run, Docker gives you a new instance that does not contain any libraries you installed previously.
  • For this reason, you should keep track of your container name, so you can use it from another command-line window to stop the container and restart it.
  • The following command will shut down the container
    docker stop your_container_name
  • The following command will restart the specified container
    docker restart your_container_name
  • Docker also provides a GUI app called Kitematic that you can use to manage your containers, including stopping and restarting them.
  • You can get the app from https://kitematic.com/ and access it through the Docker menu.
  • The user guide overviews how to manage containers with the tool

17.6.3 Word Count with Spark

  • Use Spark filter, map and reduce to implement a simple word count example that summarizes the words in Romeo and Juliet

Loading the NLTK Stop Words

In [2]:
import nltk
nltk.download('stopwords')
[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Out[2]:
True
In [3]:
from nltk.corpus import stopwords
stop_words = stopwords.words('english')

Configuring a SparkContext

  • A SparkContext object gives you access to Spark’s capabilities
  • Some Spark environments create a SparkContext for you but not the Jupyter Docker stack
  • To create a SparkContext
    • Specify the configuration options with a SparkConf object
    • setMaster specifies the Spark cluster’s URL
    • local[*] — Spark is executing on your local computer
    • * — Use same number of threads as cores on your computer
      • Simulates parallelism of Spark clusters
In [4]:
from pyspark import SparkConf
configuration = SparkConf().setAppName('RomeoAndJulietCounter')\
                           .setMaster('local[*]')
In [5]:
from pyspark import SparkContext
sc = SparkContext(conf=configuration)

Reading the Text File and Mapping It to Words

  • You work with a SparkContext using functional-style programming applied to an RDD
  • RDD enables you to transform the data stored throughout a cluster in HDFS
  • Get a new RDD representing all words in Romeo and Juliet:
In [6]:
from textblob.utils import strip_punc
In [7]:
tokenized = sc.textFile('RomeoAndJuliet.txt')\
              .flatMap(lambda line: line.lower().split())\
              .map(lambda word: strip_punc(word, all=True))

Removing the Stop Words

  • Get a new RDD with no stop words remaining:
In [8]:
filtered = tokenized.filter(lambda word: word not in stop_words)

Counting Each Remaining Word

  • Now we can count the number of occurrences of each word
  • First map each word to a tuple containing the word and 1
  • reduceByKey with the operator module’s add function as an argument adds the counts for tuples that contain same key (word)
In [9]:
from operator import add
word_counts = filtered.map(lambda word: (word, 1)).reduceByKey(add)

Keeping Only the Words with Counts Greater Than or Equal to 60

In [10]:
filtered_counts = word_counts.filter(lambda item: item[1] >= 60)

Sorting in Descending Order and Displaying the Results

  • At this point, we’ve specified all the steps to count the words
  • When you call an RDD's collect method, Spark
    • initiates the processing steps
    • returns a list containing the final results — word-count tuples
  • Everything appears to execute on one computer
  • Spark distributes tasks among the cluster’s worker nodes
In [11]:
from operator import itemgetter
sorted_items = sorted(filtered_counts.collect(),
                      key=itemgetter(1), reverse=True)

Sorting and Displaying the Results (cont.)

  • We determine the word with the most letters so we can right-align the words
In [12]:
max_len = max([len(word) for word, count in sorted_items])
for word, count in sorted_items:
    print(f'{word:>{max_len}}: {count}')
   romeo: 298
    thou: 277
  juliet: 178
     thy: 170
   nurse: 146
 capulet: 141
    love: 136
    thee: 135
   shall: 110
    lady: 109
   friar: 104
    come: 94
mercutio: 83
    good: 80
benvolio: 79
   enter: 75
      go: 75
    i’ll: 71
  tybalt: 69
   death: 69
   night: 68
lawrence: 67
     man: 65
    hath: 64
     one: 60
In [13]:
# terminate current SparkContext so we can create another for next example
sc.stop()  

17.6.4 Spark Word Count on Microsoft Azure

  • In this section, you’ll implement the Spark word-count example on a Microsoft Azure HDInsight Spark cluster

Create an Apache Spark Cluster in HDInsight Using the Azure Portal

  • How to set up a Spark cluster using the HDInsight service
  • While following the Create an HDInsight Spark cluster steps, note the same issues we listed in the Hadoop cluster setup earlier
  • For the Cluster type select Spark
  • Default cluster configuration provides more resources than you need.
  • In the Cluster summary, perform the steps shown in the Hadoop cluster setup to change the number of worker nodes to 2 and to configure the worker and head nodes to use D3 v2 computers
  • When you click Create, it takes 20 to 30 minutes to configure and deploy your cluster

Install Libraries into a Cluster

  • If your Spark code requires libraries that are not installed in the HDInsight cluster, you’ll need to install them
  • To see what libraries are installed by default, you can use ssh to log into your cluster (as shown earlier) and execute the command:
    /usr/bin/anaconda/envs/py35/bin/conda list
  • Since your code will execute on multiple cluster nodes, libraries must be installed on every node
  • Azure requires you to create a Linux shell script that specifies the commands to install the libraries
  • When you submit that script to Azure, it validates the script, then executes it on every node

Install Libraries into a Cluster (cont.)

  • Linux shell scripts are beyond this book’s scope, and the script must be hosted on a web server from which Azure can download the file
  • So, we created an install script for you that installs the libraries we use in the Spark examples
  • Perform the following steps to install these libraries:
    1. In the Azure portal, select your cluster.
    2. In the list of items under the cluster’s search box, click Script Actions.
    3. Click Submit new to configure the options for the library installation script. For the Script type select Custom, for the Name specify libraries and for the Bash script URI use: http://deitel.com/bookresources/IntroToPython/install_libraries.sh
    4. Check both Head and Worker to ensure that the script installs the libraries on all the nodes.
    5. Click Create.
  • When the cluster finishes executing the script, if it executed successfully, you’ll see a green check next to the script name in the list of script actions.
  • Otherwise, Azure will notify you that there were errors.

Copying RomeoAndJuliet.txt to the HDInsight Cluster

  • As you did in the Hadoop demo, use scp to upload RomeoAndJuliet.txt
    scp RomeoAndJuliet.txt sshuser@YourClusterName-ssh.azurehdinsight.net:
  • Replace YourClusterName with the name you specified when creating your cluster and press Enter only when you’ve typed the entire command
  • The colon is required and indicates that you’ll supply your cluster password when prompted
  • At that prompt, type the password you specified when setting up the cluster, then press Enter

Copying RomeoAndJuliet.txt to the HDInsight Cluster (cont.)

  • Use ssh to log into your cluster and access its command line
    ssh sshuser@YourClusterName-ssh.azurehdinsight.net
  • Replace YourClusterName with your cluster name.
  • Again, you’ll be prompted for your cluster password
  • To work with the RomeoAndJuliet.txt file in Spark, use ssh to copy the file into the cluster’s Hadoop’s file system by executing the following command
    hadoop fs -copyFromLocal RomeoAndJuliet.txt 
      /example/data/RomeoAndJuliet.txt
  • We use the already existing folder /examples/data that Microsoft includes for use with HDInsight tutorials
  • Again, press Enter only when you’ve typed the entire command

Accessing Jupyter Notebooks in HDInsight

  • At the time of this writing, HDInsight uses the old Jupyter Notebook interface, rather than the newer JupyterLab interface shown earlier.
  • A quick overview of the old interface
  • To access Jupyter Notebooks in HDInsight, in the Azure portal select All resources, then your cluster
  • In the Overview tab, select Jupyter notebook under Cluster dashboards
  • Opens a web browser window and asks you to log in
  • Use the username and password you specified when setting up the cluster
    • If you did not specify a username, the default is admin.
  • Once you log in, Jupyter displays a folder containing PySpark and Scala subfolders
  • These contain Python and Scala Spark tutorials

Uploading the RomeoAndJulietCounter.ipynb Notebook

  • You can create new notebooks by clicking New and selecting PySpark3, or you can upload existing notebooks from your computer.
  • For this example, let’s upload the previous section’s RomeoAndJulietCounter.ipynb notebook and modify it to work with Azure.
  • To do so, click the Upload button, navigate to the ch17 example folder’s SparkWordCount folder, select RomeoAndJulietCounter.ipynb and click Open
  • Displays the file in the folder with an Upload button to its right
    • Click that button to place the notebook in the current folder
  • Next, click the notebook’s name to open it in a new browser tab
  • Jupyter will display a Kernel not found dialog
  • Select PySpark3 and click OK
  • Do not run any cells yet

Modifying the Notebook to Work with Azure

  • Perform the following steps, executing each cell as you complete the step
  1. The HDInsight cluster will not allow NLTK to store the downloaded stop words in NLTK’s default folder because it’s part of the system’s protected folders
    • In the first cell, modify the call nltk.download('stopwords') as follows to store the stop words in the current folder (`'.'):
      nltk.download('stopwords', download_dir='.')
      
  • When you execute the first cell, Starting Spark application appears below the cell while HDInsight sets up a SparkContext object named sc for you
  • When this task is complete, the cell’s code executes and downloads the stop words

Modifying the Notebook to Work with Azure (cont.)

  1. In the second cell, before loading the stop words, you must tell NLTK that they’re located in the current folder
    • Add the following statement after the import statement to tell NLTK to search for its data in the current folder:
      nltk.data.path.append('.')
      
  2. Because HDInsight sets up the SparkContext object for you, the third and fourth cells of the original notebook are not needed, so you can delete them
    • Click inside it and select Delete Cells from Jupyter’s Edit menu, or click in the white margin to the cell’s left and type dd

Modifying the Notebook to Work with Azure (cont.)

  1. In the next cell, specify the location of RomeoAndJuliet.txt in the underlying Hadoop file system
    • Replace the string 'RomeoAndJuliet.txt' with the string
      'wasb:///example/data/RomeoAndJuliet.txt'
      
    • The notation wasb:///indicates that RomeoAndJuliet.txt is stored in a Windows Azure Storage Blob (WASB)—Azure’s interface to the HDFS file system
  2. Azure currently uses Python 3.5.x, so it does not support f-strings
    • In the last cell, replace the f-string with the following older-style Python string formatting using the string method format:
      print('{:>{width}}: {}'.format(word, count, width=max_len))
      

Modifying the Notebook to Work with Azure (cont.)

  • You’ll see the same final results as in the previous section
  • Caution: Be sure to delete your cluster and other resources when you’re done with them, so you do not incur charges

    https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-portal

  • When you delete your Azure resources, your notebooks will be deleted as well
    • You can download the notebook you just executed by selecting File > Download as > Notebook (.ipynb) in Jupyter

©1992–2020 by Pearson Education, Inc. All Rights Reserved. This content is based on Chapter 5 of the book Intro to Python for Computer Science and Data Science: Learning to Program with AI, Big Data and the Cloud.

DISCLAIMER: The authors and publisher of this book have used their best efforts in preparing the book. These efforts include the development, research, and testing of the theories and programs to determine their effectiveness. The authors and publisher make no warranty of any kind, expressed or implied, with regard to these programs or to the documentation contained in these books. The authors and publisher shall not be liable in any event for incidental or consequential damages in connection with, or arising out of, the furnishing, performance, or use of these programs.