Big Data Analysis Using PySpark
Learning Objectives
- Introduction to PySpark
- Understanding RDD, MapReduce
- Sample Project - Movie Review Analysis
Why Spark
- Lighting Fast Processing
- Real Time Strem Processing
- Easy Integration with Hadoop
- Ability to perform “in-memory” processing
This tutorial will highlight all the components, methods used as they will appear in the sample project (Lazy Evaluation).
Spark Initialization
Spark Context - It runs in driver program to coordinate independent sets of processing
Defining Spark Context
import findspark
findspark.init()
import pyspark
sc=pyspark.SparkContext.getOrCreate()
We have created spark intance referred as "sc".
Creating a RDD from file
Reading Data from CSV file and creating RDD
Here we will learn to create RDD from file. Let us now start with reading data provided in the CSV File.
data = sc.textFile("your File Path\reviews.csv")
data = data.map(lambda line: line.split(","))
data
Output: PythonRDD[6] at RDD at PythonRDD.scala:48
Above code reads "reviews.csv" file and creates an RDD Object "data".
Secondly, it splits opinions using ',' delimiter.
Understanding RDD
Resilient Distributed Datasets (RDD's) are fundamental Data structures in spark.
Features
- Lazy Evaluation: RDD do nothing unless an action is called. We can keep adding as many transformations as we want but nothing actually happens till we call an action.
- Distributed: RDD is distributed across RAM, or memory of lots of machines.
- Each RDD object is a collection of elements that can be used to hold different data structures such as dictionaries, tuples, lists etc.
Filtering Opinions
Separating the data into positive and negative for future references based on first element of each tuple ( Assuming 'true' indicated a positive review and 'false' indicated negative review).
pos_words = data.filter(lambda x: x[0]=='true')\
.flatMap(lambda x: x[1].split())
neg_words = data.filter(lambda x: x[0]=='false')\
.flatMap(lambda x: x[1].split())
Above code uses filter function to separate data based on the value provided at first element of each tuple.
<Note> - above lines of code also returns RDD thus two more RDDs pos_words and Neg_words are created.
Lambda Operator: It is a cool way to create small anonymous functions, i.e. functions without a name.
refer Link for more details: http://www.python-course.eu/lambda.php
Removing StopWords
Now we will remove stopwords to get relevant and useful words from our list
from nltk.corpus import stopwords
# function to get the non trivial words from the string
stop=stopwords.words('english')
def purify(str_line):
str_line =re.sub('[^\w\s]+', ' ', str_line)
str_line =re.sub('\s+', ' ', str_line)
str_line = re.sub("'", '', str_line)
str_line =re.sub('(\\b[A-Za-z] \\b|\\b [A-Za-z]\\b)', '', str_line)
str_words = [ j for j in str_line.split() if j not in stop]
return str_words
Above function is to remove stopwords.
pos_word_list = str(pos_words.collect())
pos_word_list = purify(pos_word_list)
neg_word_list = str(neg_words.collect())
neg_word_list = purify(neg_word_list)
Above code converts reads RDD object data and convert it into Python List.
Python Lists (pos_word_list and neg_word_list) contains lists of all the words marked as positive and negative. Next line calls purify function to remove stopwords.
Recreate RDD
RDDs can be created using two ways:
- Referencing a dataset in an external storage system (Implemented earlier as we created RDD from .csv file).
- Parallelizing an existing collection in your driver program. (Implemented below).
pos_words = sc.parallelize(pos_word_list)
neg_words = sc.parallelize(neg_word_list)
Above code creates RDDs from python list (taken as collection).
Finding out Top positive words (most used)
counts = pos_words\
.map(lambda x: (x,1)) \
.reduceByKey(add)
Above Code finds out frequency of each word used. Lambda function is used for creating a function to increment the count by 1. reduceByKey function merges the key values using associative reduce function
<note> - reduceByKey only works for RDDs where structure follows "Key->value" pair type elements.
Sorting
Now, We have the count (Frequency) of each word (excluding stopwords). Next step is to sort the words according to frequency.
Lets see the following code for sorting !
reversed_map = counts.map(lambda (k,v): (v,k)).sortByKey(False)
original_map = rev_map.map(lambda (k,v): (v,k))
original_map.take(5)
Output :
[('like', 9350),
('great', 4560),
('one', 5459),
('show', 3437),
('good', 3119)]
Understanding the above code :
Step 1: use lambda function to reverse the map to make the map of form
{Frequency (Key) -> Word (Value) }.
Step 2: sortByKey funtion sorts the RDD based on Key (First Element of pair). This is because map was reversed in Step 1. "sortByKey(False)" Passing "False" as an argument indicates sorting in decreasing order.
Step 3: Reversing map again to get the original structure i.e.
{Word (Value) -> Frequency (Key) }
Step 4: Picking out First 5 values from sorted map using RDD method take(n), where n is the number for element to take from beginning.
Try it yourself !
Basic spark functionality has been implemented. Results could be visualized using other python libraries and packages.