17.5 Hadoop

  • Apache Hadoop—a key big-data infrastructure technology that also serves as the foundation for many recent advancements in big-data processing and an entire ecosystem of software tools that are continually evolving to support today’s big-data needs

17.5.1 Hadoop Overview

  • When Google was launched in 1998, the amount of online data was already enormous with approximately 2.4 million websites—truly big data.
  • Today there are now nearly two billion websites (almost a thousandfold increase) and Google is handling over two trillion searches per year! Having used Google search since its inception, our sense is that today’s responses are significantly faster.
  • When Google was developing their search engine, they knew that they needed to return search results quickly.
  • The only practical way to do this was to store and index the entire Internet using a clever combination of secondary storage and main memory.

17.5.1 Hadoop Overview (cont.)

  • Computers of that time couldn’t hold that amount of data economically and could not analyze that amount of data fast enough to guarantee prompt search-query responses.
  • So Google developed a clustering system, tying together vast numbers of commodity computers—called nodes.
  • Because having more computers and more connections between them meant greater chance of hardware failures, they also built in high levels of redundancy to ensure that the system would continue functioning even if nodes within clusters failed.
  • The data was distributed across all these inexpensive commodity computers.
  • To satisfy a search request, all the computers in the cluster searched in parallel the portion of the web they had locally.
  • Then the results of those searches were gathered up and reported back to the user.

17.5.1 Hadoop Overview (cont.)

  • To accomplish this, Google needed to develop the clustering hardware and software, including distributed storage.
  • Google published its designs, but did not open source its software.
  • Programmers at Yahoo!, working from Google’s designs in the “Google File System” paper, then built their own system.
  • They open-sourced their work and the Apache organization implemented the system as Hadoop.
  • The name came from an elephant stuffed animal that belonged to a child of one of Hadoop’s creators.

17.5.1 Hadoop Overview (cont.)

HDFS, MapReduce and YARN

  • Hadoop’s key components are:
    • HDFS (Hadoop Distributed File System) for storing massive amounts of data throughout a cluster, and
    • MapReduce for implementing the tasks that process the data.
  • Hadoop MapReduce is similar in concept to filter/map/reduce in functional-style programming, just on a massively parallel scale.
  • A MapReduce task performs two steps—mapping and reduction.
    • The mapping step, which also may include filtering, processes the original data across the entire cluster and maps it into tuples of key–value pairs.
    • The reduction step then combines those tuples to produce the results of the MapReduce task.

HDFS, MapReduce and YARN (cont.)

  • Hadoop divides the data into batches that it distributes across the nodes in the cluster—anywhere from a few nodes to a Yahoo! cluster with 40,000 nodes and over 100,000 cores.
  • Hadoop also distributes the MapReduce task’s code to the nodes in the cluster and executes the code in parallel on every node.
  • Each node processes only the batch of data stored on that node.
  • The reduction step combines the results from all the nodes to produce the final result.
  • To coordinate this, Hadoop uses YARN (“yet another resource negotiator”) to manage all the resources in the cluster and schedule tasks for execution.

Hadoop Ecosystem

  • Hadoop ecosystem includes Spark (discussed in Sections 16.6–16.7) and many other Apache projects:[1],[2],[3]
  • Ambari (https://ambari.apache.org)—Tools for managing Hadoop clusters.
  • Drill (https://drill.apache.org)—SQL querying of non-relational data in Hadoop and NoSQL databases.

Hadoop Ecosystem (cont.)

  • Flume (https://flume.apache.org)—A service for collecting and storing (in HDFS and other storage) streaming event data, like high-volume server logs, IoT messages and more.
  • HBase (https://hbase.apache.org)—A NoSQL database for big data with "billions of rows by millions of columns—atop clusters of commodity hardware." (We used the word “by” to replace “X” in the original quote.)
  • Hive (https://hive.apache.org)—Uses SQL to interact with data in data warehouses. A data warehouse aggregates data of various types from various sources. Common operations include extracting data, transforming it and loading (known as ETL) into another database, typically so you can analyze it and create reports from it.
  • Impala (https://impala.apache.org)—A database for real-time SQL-based queries across distributed data stored in Hadoop HDFS or HBase.

Hadoop Ecosystem (cont.)

  • Kafka (https://kafka.apache.org)—Real-time messaging, stream processing and storage, typically to transform and process high-volume streaming data, such as website activity and streaming IoT data.
  • Pig (https://pig.apache.org)—A scripting platform that converts data analysis tasks from a scripting language called Pig Latin into MapReduce tasks.
  • *Sqoop (https://sqoop.apache.org)—Tool for moving structured, semi-structured and unstructured data between databases.
  • Storm (https://storm.apache.org)—A real-time stream-processing system for tasks such as data analytics, machine learning, ETL and more.
  • ZooKeeper (https://zookeeper.apache.org)—A service for managing cluster configurations and coordination between clusters.
  • And more.

Hadoop Providers

  • Numerous cloud vendors provide Hadoop as a service, including Amazon EMR, Google Cloud DataProc, IBM Watson Analytics Engine, Microsoft Azure HDInsight and others.
  • In addition, companies like Cloudera and Hortonworks (which at the time of this writing are merging) offer integrated Hadoop-ecosystem components and tools via the major cloud vendors.
  • They also offer free downloadable environments that you can run on the desktop for learning, development and testing before you commit to cloud-based hosting, which can incur significant costs.
    • Check their significant system requirements first to ensure that you have the disk space and memory required to run them.
  • We introduce MapReduce programming in the example in the following sections by using a Microsoft cloud-based Azure HDInsight cluster, which provides Hadoop as a service.

Hadoop 3

17.5.2 Summarizing Word Lengths in Romeo and Juliet via MapReduce

  • In the next several subsections, you’ll create a cloud-based, multi-node cluster of computers using Microsoft Azure HDInsight.
  • Then, you’ll use the service’s capabilities to demonstrate Hadoop MapReduce running on that cluster.
  • The MapReduce task you’ll define will determine the length of each word in RomeoAndJuliet.txt (from the “Natural Language Processing” chapter), then summarize how many words of each length there are.
  • After defining the task’s mapping and reduction steps, you’ll submit the task to your HDInsight cluster, and Hadoop will decide how to use the cluster of computers to perform the task.

17.5.3 Creating an Apache Hadoop Cluster in Microsoft Azure HDInsight (cont.)

  • Most major cloud vendors have support for Hadoop and Spark computing clusters that you can configure to meet your application’s requirements.
  • Multi-node cloud-based clusters typically are paid services, though most vendors provide free trials or credits so you can try out their services.
  • We want you to experience the process of setting up clusters and using them to perform tasks.
  • So, in this Hadoop example, you’ll use Microsoft Azure’s HDInsight service to create cloud-based clusters of computers in which to test our examples.
  • Sign up for an account

    https://azure.microsoft.com/en-us/free

17.5.3 Creating an Apache Hadoop Cluster in Microsoft Azure HDInsight (cont.)

  • Microsoft requires a credit card for identity verification.
  • Various services are always free and some you can continue to use for 12 months.
  • For information on these services see:

    https://azure.microsoft.com/en-us/free/free-account-faq/

  • Microsoft also gives you a credit to experiment with their paid services, such as their HDInsight Hadoop and Spark services.
  • Once your credits run out or 30 days pass (whichever comes first), you cannot continue using paid services unless you authorize Microsoft to charge your card.

17.5.3 Creating an Apache Hadoop Cluster in Microsoft Azure HDInsight (cont.)

17.5.3 Creating an Apache Hadoop Cluster in Microsoft Azure HDInsight (cont.)

For more information, see:

https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-portal

Creating an HDInsight Hadoop Cluster

  • The following link explains how to set up a cluster for Hadoop using the Azure HDInsight service:

    https://docs.microsoft.com/en-us/azure/hdinsight/hadoop/apache-hadoop-linux-create-cluster-get-started-portal

  • While following their Create a Hadoop cluster steps, please note the following:
    • In Step 1, you access the Azure portal by logging into your account at

      https://portal.azure.com

    • In Step 3, you must choose a cluster name that does not already exist. When you enter your cluster name, Microsoft will check whether that name is available and display a message if it is not. You must create a password. For the Resource group, you’ll also need to click Create new and provide a group name. Leave all other settings in this step as is.
    • In Step 5: Under Select a Storage account, click Create new and provide a storage account name containing only lowercase letters and numbers. Like the cluster name, the storage account name must be unique.

Creating an HDInsight Hadoop Cluster (cont.)

  • When you get to the Cluster summary you’ll see that Microsoft initially configures the cluster as Head (2 x D12 v2), Worker (4 x D4 v2).
  • At the time of this writing, the estimated cost-per-hour for this configuration was \$3.11.
  • This setup uses a total of 6 CPU nodes with 40 cores—far more than we need for demonstration purposes.
  • You can edit this setup to use fewer CPUs and cores, which also saves money.
  • Let’s change the configuration to a four-CPU cluster with 16 cores that uses less powerful computers.

Creating an HDInsight Hadoop Cluster (cont.)

  • In the Cluster summary:
    1. Click Edit to the right of Cluster size.
    2. Change the Number of Worker nodes to 2.
    3. Click Worker node size, then View all, select D3 v2 (this is the minimum CPU size for Hadoop nodes) and click Select.
    4. Click Head node size, then View all, select D3 v2 and click Select.
    5. Click Next and click Next again to return to the Cluster summary. Microsoft will validate the new configuration.
    6. When the Create button is enabled, click it to deploy the cluster.
  • It takes 20–30 minutes for Microsoft to “spin up your cluster.

Creating an HDInsight Hadoop Cluster (cont.)

  • During this time, Microsoft is allocating all the resources and software the cluster requires.
  • After the changes above, our estimated cost for the cluster was \$1.18 per hour, based on average use for similarly configured clusters.
  • Our actual charges were less than that.
  • If you encounter any problems configuring your cluster, Microsoft provides HDInsight chat-based support.

17.5.4 Hadoop Streaming

  • Hadoop is Java-based
  • Languages like Python that are not natively supported must use Hadoop streaming
  • Python MapReduce scripts communicate with Hadoop via redirected standard I/O streams
    • Hadoop redirects input to mapper script, which reads input from standard input stream
    • Mapper writes results to standard output stream
    • Hadoop redirects mapper’s output as input to reducer script, which reads from the standard input stream
    • Reducer writes results to standard output stream
    • Hadoop writes reducer’s output to HDFS

17.5.5 Implementing the Mapper

  • Mapper takes lines of text as input and maps them to key–value pairs, each containing a word length and 1
  • Reducer will total these key–value pairs by key
  • Hadoop streaming expects mapper’s output and reducer’s input/output to have the form

    key\tvalue

  • In length_mapper.py, #! tells Hadoop to use Python 3
    • Must be first line in the file
    • HDInsight currently includes Python 2.7.12 and Python 3.5.2
    • Cannot use f-strings which are Python 3.6+

17.5.5 Implementing the Mapper (cont.)

#!/usr/bin/env python3
# length_mapper.py
"""Maps lines of text to key-value pairs of word lengths and 1."""
import sys

def tokenize_input():  # generator function
    """Split each line of standard input into a list of strings."""
    for line in sys.stdin:
        yield line.split()  

# read each line in the the standard input and for every word 
# produce a key-value pair containing the word, a tab and 1
for line in tokenize_input():
    for word in line:
        print(str(len(word)) + '\t1')

17.5.6 Implementing the Reducer

  • In length_reducer.py, function tokenize_input is a generator function that reads and splits the key–value pairs produced by the mapper
  • MapReduce streaming supplies the standard input
  • groupby function (itertools module) groups inputs by their keys (the word lengths)
  • Total all the counts for a given key
  • Output a new key–value pair consisting of the word length and its total
  • MapReduce takes the final word-count outputs and writes them to a file in HDFS
#!/usr/bin/env python3
# length_reducer.py
"""Counts the number of words with each length."""
import sys
from itertools import groupby
from operator import itemgetter

def tokenize_input():
    """Split each line of standard input into a key and a value."""
    for line in sys.stdin:
        yield line.strip().split('\t')

# produce key-value pairs of word lengths and counts separated by tabs
for word_length, group in groupby(tokenize_input(), itemgetter(0)):
    try:
        total = sum(int(count) for word_length, count in group)
        print(word_length + '\t' + str(total))
    except ValueError:
        pass  # ignore word if its count was not an integer

17.5.7 Preparing to Run the MapReduce Example

  • Next, you’ll upload files to the cluster so you can execute the example.
  • In a Command Prompt, Terminal or shell, change to the folder containing your mapper and reducer scripts and the RomeoAndJuliet.txt file.
  • We assume all three are in this chapter’s ch16 examples folder, so be sure to copy your RomeoAndJuliet.txt file to this folder first.

Copying the Script Files to the HDInsight Hadoop Cluster

  • Enter the following command to upload the files.
  • Be sure to replace YourClusterName with the cluster name you specified when setting up the Hadoop cluster and press Enter only after you’ve typed the entire command.
  • The colon in the following command is required and indicates that you’ll supply your cluster password when prompted.
  • At that prompt, type the password you specified when setting up the cluster, then press Enter:

scp length_mapper.py length_reducer.py RomeoAndJuliet.txt sshuser@YourClusterName-ssh.azurehdinsight.net:

  • The first time you do this, you’ll be asked for security reasons to confirm that you trust the target host (that is, Microsoft Azure).

Copying RomeoAndJuliet into the Hadoop File System

  • For Hadoop to read the contents of RomeoAndJuliet.txt and supply the lines of text to your mapper, you must first copy the file into Hadoop’s file system.
  • First, you must use ssh to log into your cluster and access its command line.
  • After completing the installation, log out and log back in or restart your system to enable ssh.
  • In a Command Prompt, Terminal or shell, execute the following command.
  • Be sure to replace YourClusterName with your cluster name.
  • Again, you’ll be prompted for your cluster password:
ssh sshuser@YourClusterName-ssh.azurehdinsight.net
  • For this example, we’ll use the following Hadoop command to copy the text file into the already existing folder /examples/data that the cluster provides for use with Microsoft’s Azure Hadoop tutorials.
  • Again, press Enter only when you’ve typed the entire command:
hadoop fs -copyFromLocal RomeoAndJuliet.txt 
    /example/data/RomeoAndJuliet.txt

17.5.8 Running the MapReduce Job

  • Once files are in the cluster, run MapReduce job for RomeoAndJuliet.txt on your cluster by executing the following command in the cluster
    • You can copy/paste the command from yarn.txt located with this example
    • We reformatted the command here for readability:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar 
   -D mapred.output.key.comparator.class=
      org.apache.hadoop.mapred.lib.KeyFieldBasedComparator    
   -D mapred.text.key.comparator.options=-n   
   -files length_mapper.py,length_reducer.py    
   -mapper length_mapper.py 
   -reducer length_reducer.py    
   -input /example/data/RomeoAndJuliet.txt    
   -output /example/wordlengthsoutput

17.5.8 Running the MapReduce Job (cont.)

  • The yarn command invokes Hadoop’s YARN (“yet another resource negotiator”) tool to manage and coordinate access to the Hadoop resources the MapReduce task uses
  • hadoop-streaming.jar contains the Java-based Hadoop streaming utility that allows you to use Python to implement the mapper and reducer
  • The two -D options set Hadoop properties that enable it to
    • sort the final key–value pairs by key (KeyFieldBasedComparator)
    • in numerically (-n) rather than alphabetically

17.5.8 Running the MapReduce Job (cont.)

  • Other command-line arguments:
    • -files—Comma-separated list of scripts that Hadoop copies to every node in the cluster so they can execute locally on each node
    • -mapper—mapper’s script file
    • -reducer—reducer’s script file
    • -inputFile or directory of files to supply as mapper input
    • -outputHDFS directory where final results will be stored
      • Error if this folder already exists

17.5.8 Running the MapReduce Job (cont.)

  • Sample output shows some Hadoop feedback produced as the MapReduce job executes
    • Used ... to save space
  • Several lines of interest:
    • The total number of “input paths to process”—the 1 source of input in this example is RomeoAndJuliet.txt
    • The “number of splits”2 in this example, based on the number of worker nodes in our HDInsight cluster
    • The percentage completion dates and times—big data jobs could take minutes, hours, days, ...
    • File System Counters showing numbers of bytes read and written
    • Job Counters showing the numbers of mapping and reduction tasks used
    • Map-Reduce Framework showing stats about the steps performed

Output

packageJobJar: [] [/usr/hdp/2.6.5.3004-13/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.5.3004-13.jar] /tmp/streamjob2764990629848702405.jar tmpDir=null
...
18/12/05 16:46:25 INFO mapred.FileInputFormat: Total input paths to process : 1
18/12/05 16:46:26 INFO mapreduce.JobSubmitter: number of splits:2
...
18/12/05 16:46:26 INFO mapreduce.Job: The url to track the job: http://hn0-paulte.y3nghy5db2kehav5m0opqrjxcb.cx.internal.cloudapp.net:8088/proxy/application_1543953844228_0025/
...
18/12/05 16:46:35 INFO mapreduce.Job:  map 0% reduce 0%
18/12/05 16:46:43 INFO mapreduce.Job:  map 50% reduce 0%
18/12/05 16:46:44 INFO mapreduce.Job:  map 100% reduce 0%
18/12/05 16:46:48 INFO mapreduce.Job:  map 100% reduce 100%
18/12/05 16:46:50 INFO mapreduce.Job: Job job_1543953844228_0025 completed successfully
18/12/05 16:46:50 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=156411
        FILE: Number of bytes written=813764
...
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=1
...
    Map-Reduce Framework
        Map input records=5260
        Map output records=25956
        Map output bytes=104493
        Map output materialized bytes=156417
        Input split bytes=346
        Combine input records=0
        Combine output records=0
        Reduce input groups=19
        Reduce shuffle bytes=156417
        Reduce input records=25956
        Reduce output records=19
        Spilled Records=51912
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=193
        CPU time spent (ms)=4440
        Physical memory (bytes) snapshot=1942798336
        Virtual memory (bytes) snapshot=8463282176
        Total committed heap usage (bytes)=3177185280
...
18/12/05 16:46:50 INFO streaming.StreamJob: Output directory: /example/wordlengthsoutput

Viewing the Word Counts

  • Hadoop MapReduce saves its output into HDFS
  • To view final word counts you must look at the file in cluster's HDFS
    hdfs dfs -text /example/wordlengthsoutput/part-00000
18/12/05 16:47:19 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
18/12/05 16:47:19 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev b5efb3e531bc1558201462b8ab15bb412ffa6b89]
1   4699
2   3869
3   5651
4   3668
5   2719
6   1624
7   1140
8   1062
9   855
10  317
11  189
12  95
13  35
14  13
15  9
16  6
17  3
18  1
23  1

IMPORTANT: Deleting Your Cluster So You Do Not Incur Charges

©1992–2020 by Pearson Education, Inc. All Rights Reserved. This content is based on Chapter 5 of the book Intro to Python for Computer Science and Data Science: Learning to Program with AI, Big Data and the Cloud.

DISCLAIMER: The authors and publisher of this book have used their best efforts in preparing the book. These efforts include the development, research, and testing of the theories and programs to determine their effectiveness. The authors and publisher make no warranty of any kind, expressed or implied, with regard to these programs or to the documentation contained in these books. The authors and publisher shall not be liable in any event for incidental or consequential damages in connection with, or arising out of, the furnishing, performance, or use of these programs.