PySpark Programming
Python and Apache Spark are the hottest buzzwords in the analytics industry. Apache Spark is a popular open source framework that ensures data processing with lightning speed and supports various languages like Scala, Python, Java, and R. It then boils down to your language preference and scope of work. Through this PySpark programming article, I would be talking about Spark with Python to demonstrate how Python leverages the functionalities of Apache Spark.
PySpark is the collaboration of Apache Spark and Python.
Apache Spark is an open-source cluster-computing framework, built around speed, ease of use, and streaming analytics whereas Python is a general-purpose, high-level programming language. It provides a wide range of libraries and is majorly used for Machine Learning and Real-Time Streaming Analytics.
In other words, it is a Python API for Spark that lets you harness the simplicity of Python and the power of Apache Spark in order to tame Big Data.
You might be wondering, why I chose Python to work with Spark when there are other languages available. To answer this, I have listed down few of the advantages that you will enjoy with Python:
- Python is very easy to learn and implement.
- It provides simple and comprehensive API.
- With Python, the readability of code, maintenance, and familiarity is far better.
- It provides various options for data visualization, which is difficult using Scala or Java.
- Python comes with a wide range of libraries like numpy, pandas, scikit-learn, seaborn, matplotlib etc.
- It is backed up by a huge and active community.
Now that you know the advantages of PySpark programming, let’s simply dive into the fundamentals of PySpark.
Resilient Distributed Datasets (RDDs)
RDDs are the building blocks of any Spark application. RDDs Stands for:
- Resilient: It is fault tolerant and is capable of rebuilding data on failure.
- Distributed: Data is distributed among the multiple nodes in a cluster.
- Dataset: Collection of partitioned data with values.
It is a layer of abstracted data over the distributed collection. It is immutable in nature and follows lazy transformation.
With RDDs, you can perform two types of operations:
- Transformations: These operations are applied to create a new RDD.
- Actions: These operations are applied on an RDD to instruct Apache Spark to apply computation and pass the result back to the driver.
DataFrame
Dataframe in PySpark is the distributed collection of structured or semi-structured data. This data in Dataframe is stored in rows under named columns which is similar to the relational database tables or excel sheets.
It also shares some common attributes with RDD like Immutable in nature, follows lazy evaluations and is distributed in nature. It supports a wide range of formats like JSON, CSV, TXT and many more. Also, you can load it from the existing RDDs or by programmatically specifying the schema.
PySpark SQL
PySpark SQL is a higher-level abstraction module over the PySpark Core. It is majorly used for processing structured and semi-structured datasets. It also provides an optimized API that can read the data from the various data source containing different files formats. Thus, with PySpark you can process the data by making use of SQL as well as HiveQL. Because of this feature, PySparkSQL is slowly gaining popularity among database programmers and Apache Hive users.
PySpark Streaming
PySpark Streaming is a scalable, fault-tolerant system that follows the RDD batch paradigm. It is basically operated in mini-batches or batch intervals which can range from 500ms to larger interval windows.
In this, Spark Streaming receives a continuous input data stream from sources like Apache Flume, Kinesis, Kafka, TCP sockets etc. These streamed data are then internally broken down into multiple smaller batches based on the batch interval and forwarded to the Spark Engine. Spark Engine processes these data batches using complex algorithms expressed with high-level functions like map, reduce, join and window. Once the processing is done, the processed batches are then pushed out to databases, filesystems, and live dashboards.
The key abstraction for Spark Streaming is Discretized Stream (DStream). DStreams are built on RDDs facilitating the Spark developers to work within the same context of RDDs and batches to solve the streaming issues. Moreover, Spark Streaming also integrates with MLlib, SQL, DataFrames, and GraphX which widens your horizon of functionalities. Being a high-level API, Spark Streaming provides fault-tolerance “exactly-once” semantics for stateful operations.
NOTE : “exactly-once” semantics means events will be processed “exactly once” by all operators in the stream application, even if any failure occurs.
Below diagram, represents the basic components of Spark Streaming.
As you can see, Data is ingested into the Spark Stream from various sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and many more. Further, this data is processed using complex algorithms expressed with high-level functions like map, reduce, join, and window. Finally, this processed data is pushed out to various file systems, databases, and live dashboards for further utilization.
I hope this gave you a clear picture of how PySpark Streaming works. Let’s now move on to the last but most enticing topic of this PySpark Programming article, i.e. Machine Learning.
Machine Learning
As you already know, Python is a mature language that is being heavily used for data science and machine learning since ages. In PySpark, machine learning is facilitated by a Python library called MLlib (Machine Learning Library). It is nothing but a wrapper over PySpark Core that performs data analysis using machine-learning algorithms like classification, clustering, linear regression and few more.
One of the enticing features of machine learning with PySpark is that it works on distributed systems and is highly scalable.
MLlib exposes three core machine learning functionalities with PySpark:
- Data Preparation: It provides various features like extraction, transformation, selection, hashing etc.
- Machine Learning Algorithms: It avails some popular and advanced regression, classification, and clustering algorithms for machine learning.
- Utilities: It has statistical methods such as chi-square testing, descriptive statistics, linear algebra and model evaluation methods.
Let me show you how to implement machine learning using classification through logistic regression.
Here, I will be performing a simple predictive analysis on a food inspection data of Chicago City.
##Importing the required libraries
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import * ##creating a RDD by importing and parsing the input data
def csvParse(s):
import csv
from StringIO import StringIO
sio = StringIO(s)
value = csv.reader(sio).next()
sio.close()
return value food_inspections = sc.textFile('file:////home/edureka/Downloads/Food_Inspections_Chicago_data.csv')\
.map(csvParse) ##Display data format
food_inspections.take(1)
#Structuring the data
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("results", StringType(), False),
StructField("violations", StringType(), True)])
#creating a dataframe and a temporary table (Results) required for the predictive analysis. ##sqlContext is used to perform transformations on structured data
ins_df = spark.createDataFrame(food_inspections.map(lambda l: (int(l[0]), l[1], l[12], l[13])) , schema)
ins_df.registerTempTable('Count_Results')
ins_df.show()
##Let's now understand our dataset
#show the distinct values in the results column
result_data = ins_df.select('results').distinct().show()
##converting the existing dataframe into a new dataframe ###each inspection is represented as a label-violations pair. ####Here 0.0 represents a failure, 1.0 represents a success, and -1.0 represents some results besides those two
def label_Results(s):
if s == 'Fail':
return 0.0
elif s == 'Pass with Conditions' or s == 'Pass':
return 1.0
else:
return -1.0
ins_label = UserDefinedFunction(label_Results, DoubleType())
labeled_Data = ins_df.select(ins_label(ins_df.results).alias('label'), ins_df.violations).where('label >= 0')
labeled_Data.take(1)
##Creating a logistic regression model from the input dataframe
tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(labeled_Data)
## Evaluating with Test Data test_Data = sc.textFile('file:////home/edureka/Downloads/Food_Inspections_test.csv')\
.map(csvParse) \
.map(lambda l: (int(l[0]), l[1], l[12], l[13]))
test_df = spark.createDataFrame(test_Data, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass with Conditions'")
predict_Df = model.transform(test_df)
predict_Df.registerTempTable('Predictions')
predict_Df.columns
## Printing 1st row
predict_Df.take(1)
## Predicting the final result
numOfSuccess = predict_Df.where("""(prediction = 0 AND results = 'Fail') OR
(prediction = 1 AND (results = 'Pass' OR
results = 'Pass with Conditions'))""").count()
numOfInspections = predict_Df.count()
print "There were", numOfInspections, "inspections and there were", numOfSuccess, "successful predictions"
print "This is a", str((float(numOfSuccess) / float(numOfInspections)) * 100) + "%", "success rate"
With this, we come to the end of this blog on PySpark Programming. Hope it helped in adding some value to your knowledge.
If you found this PySpark Programming blog interesting, you can go ahead and read similar blogs here.
Got a question for us? Please mention it in the comments section and we will get back to you.