Codementor Events

Spark & Python: SQL & DataFrames

Published Jul 03, 2015Last updated Mar 21, 2017

Instructions

My Spark & Python series of tutorials can be examined individually, although there is a more or less linear 'story' when followed in sequence. By using the same dataset they try to solve a related set of tasks with it.

It is not the only one but, a good way of following these Spark tutorials is by first cloning the GitHub repo, and then starting your own IPython notebook in pySpark mode. For example, if we have a standalone Spark installation running in our localhost with a maximum of 6Gb per node assigned to IPython:

MASTER="spark://127.0.0.1:7077" SPARK_EXECUTOR_MEMORY="6G" IPYTHON_OPTS="notebook --pylab inline" ~/spark-1.3.1-bin-hadoop2.6/bin/pyspark

Notice that the path to the pyspark command will depend on your specific installation. So as a requirement, you need to have Spark installed in the same machine you are going to start the IPython notebook server.

For more Spark options see here. In general it works the rule of passign options described in the form spark.executor.memory as SPARK_EXECUTOR_MEMORY when calling IPython/pySpark.

Datasets

We will be using datasets from the KDD Cup 1999.

References

The reference book for these and other Spark related topics is Learning Spark by Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia.

The KDD Cup 1999 competition dataset is described in detail here.

Introduction

This tutorial will introduce Spark capabilities to deal with data in a structured way. Basically, everything turns around the concept of Data Frame and using SQL language to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerful when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.

Getting the Data and Creating the RDD

As we did in previous notebooks, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")


data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file).cache()

Getting a Data Frame

A Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as an existing RDD in our case.

The entry point into all SQL functionality in Spark is the SQLContext class. To create a basic instance, all we need is a SparkContext reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object sc for this purpose.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Inferring the Schema

With a SQLContext, we are ready to create a DataFrame from our existing RDD. But first we need to tell Spark SQL the schema in our data.

Spark SQL can convert an RDD of Row objects to a DataFrame. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.

In our case, we first need to split the comma separated data, and then use the information in KDD's 1999 task description to obtain the column names.

from pyspark.sql import Row

csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

Once we have our RDD of Row we can infer and register the schema.

interactions_df = sqlContext.createDataFrame(row_data)
interactions_df.registerTempTable("interactions")

Now we can run SQL queries over our data frame that has been registered as a table.

# Select tcp network interactions with more than 1 second duration and no transfer from destination
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()
duration dst_bytes
    5057     0        
    5059     0        
    5051     0        
    5056     0        
    5051     0        
    5039     0        
    5062     0        
    5041     0        
    5056     0        
    5064     0        
    5043     0        
    5061     0        
    5049     0        
    5061     0        
    5048     0        
    5047     0        
    5044     0        
    5063     0        
    5068     0        
    5062     0        

The results of SQL queries are RDDs and support all the normal RDD operations.

# Output duration together with dst_bytes
tcp_interactions_out = tcp_interactions.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
  print ti_out
    Duration: 5057, Dest. bytes: 0  
    Duration: 5059, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5056, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5039, Dest. bytes: 0  
    Duration: 5062, Dest. bytes: 0  
    Duration: 5041, Dest. bytes: 0  
    Duration: 5056, Dest. bytes: 0  
    Duration: 5064, Dest. bytes: 0  
    Duration: 5043, Dest. bytes: 0  
    Duration: 5061, Dest. bytes: 0  
    Duration: 5049, Dest. bytes: 0  
    Duration: 5061, Dest. bytes: 0  
    Duration: 5048, Dest. bytes: 0  
    Duration: 5047, Dest. bytes: 0  
    Duration: 5044, Dest. bytes: 0  
    Duration: 5063, Dest. bytes: 0  
    Duration: 5068, Dest. bytes: 0  
    Duration: 5062, Dest. bytes: 0  
    Duration: 5046, Dest. bytes: 0  
    Duration: 5052, Dest. bytes: 0  
    Duration: 5044, Dest. bytes: 0  
    Duration: 5054, Dest. bytes: 0  
    Duration: 5039, Dest. bytes: 0  
    Duration: 5058, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5032, Dest. bytes: 0  
    Duration: 5063, Dest. bytes: 0  
    Duration: 5040, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5066, Dest. bytes: 0  
    Duration: 5044, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5036, Dest. bytes: 0  
    Duration: 5055, Dest. bytes: 0  
    Duration: 2426, Dest. bytes: 0  
    Duration: 5047, Dest. bytes: 0  
    Duration: 5057, Dest. bytes: 0  
    Duration: 5037, Dest. bytes: 0  
    Duration: 5057, Dest. bytes: 0  
    Duration: 5062, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5053, Dest. bytes: 0  
    Duration: 5064, Dest. bytes: 0  
    Duration: 5044, Dest. bytes: 0  
    Duration: 5051, Dest. bytes: 0  
    Duration: 5033, Dest. bytes: 0  
    Duration: 5066, Dest. bytes: 0  
    Duration: 5063, Dest. bytes: 0  
    Duration: 5056, Dest. bytes: 0  
    Duration: 5042, Dest. bytes: 0  
    Duration: 5063, Dest. bytes: 0  
    Duration: 5060, Dest. bytes: 0  
    Duration: 5056, Dest. bytes: 0  
    Duration: 5049, Dest. bytes: 0  
    Duration: 5043, Dest. bytes: 0  
    Duration: 5039, Dest. bytes: 0  
    Duration: 5041, Dest. bytes: 0  
    Duration: 42448, Dest. bytes: 0  
    Duration: 42088, Dest. bytes: 0  
    Duration: 41065, Dest. bytes: 0  
    Duration: 40929, Dest. bytes: 0  
    Duration: 40806, Dest. bytes: 0  
    Duration: 40682, Dest. bytes: 0  
    Duration: 40571, Dest. bytes: 0  
    Duration: 40448, Dest. bytes: 0  
    Duration: 40339, Dest. bytes: 0  
    Duration: 40232, Dest. bytes: 0  
    Duration: 40121, Dest. bytes: 0  
    Duration: 36783, Dest. bytes: 0  
    Duration: 36674, Dest. bytes: 0  
    Duration: 36570, Dest. bytes: 0  
    Duration: 36467, Dest. bytes: 0  
    Duration: 36323, Dest. bytes: 0  
    Duration: 36204, Dest. bytes: 0  
    Duration: 32038, Dest. bytes: 0  
    Duration: 31925, Dest. bytes: 0  
    Duration: 31809, Dest. bytes: 0  
    Duration: 31709, Dest. bytes: 0  
    Duration: 31601, Dest. bytes: 0  
    Duration: 31501, Dest. bytes: 0  
    Duration: 31401, Dest. bytes: 0  
    Duration: 31301, Dest. bytes: 0  
    Duration: 31194, Dest. bytes: 0  
    Duration: 31061, Dest. bytes: 0  
    Duration: 30935, Dest. bytes: 0  
    Duration: 30835, Dest. bytes: 0  
    Duration: 30735, Dest. bytes: 0  
    Duration: 30619, Dest. bytes: 0  
    Duration: 30518, Dest. bytes: 0  
    Duration: 30418, Dest. bytes: 0  
    Duration: 30317, Dest. bytes: 0  
    Duration: 30217, Dest. bytes: 0  
    Duration: 30077, Dest. bytes: 0  
    Duration: 25420, Dest. bytes: 0  
    Duration: 22921, Dest. bytes: 0  
    Duration: 22821, Dest. bytes: 0  
    Duration: 22721, Dest. bytes: 0  
    Duration: 22616, Dest. bytes: 0  
    Duration: 22516, Dest. bytes: 0  
    Duration: 22416, Dest. bytes: 0  
    Duration: 22316, Dest. bytes: 0  
    Duration: 22216, Dest. bytes: 0  
    Duration: 21987, Dest. bytes: 0  
    Duration: 21887, Dest. bytes: 0  
    Duration: 21767, Dest. bytes: 0  
    Duration: 21661, Dest. bytes: 0  
    Duration: 21561, Dest. bytes: 0  
    Duration: 21455, Dest. bytes: 0  
    Duration: 21334, Dest. bytes: 0  
    Duration: 21223, Dest. bytes: 0  
    Duration: 21123, Dest. bytes: 0  
    Duration: 20983, Dest. bytes: 0  
    Duration: 14682, Dest. bytes: 0  
    Duration: 14420, Dest. bytes: 0  
    Duration: 14319, Dest. bytes: 0  
    Duration: 14198, Dest. bytes: 0  
    Duration: 14098, Dest. bytes: 0  
    Duration: 13998, Dest. bytes: 0  
    Duration: 13898, Dest. bytes: 0  
    Duration: 13796, Dest. bytes: 0  
    Duration: 13678, Dest. bytes: 0  
    Duration: 13578, Dest. bytes: 0  
    Duration: 13448, Dest. bytes: 0  
    Duration: 13348, Dest. bytes: 0  
    Duration: 13241, Dest. bytes: 0  
    Duration: 13141, Dest. bytes: 0  
    Duration: 13033, Dest. bytes: 0  
    Duration: 12933, Dest. bytes: 0  
    Duration: 12833, Dest. bytes: 0  
    Duration: 12733, Dest. bytes: 0  
    Duration: 12001, Dest. bytes: 0  
    Duration: 5678, Dest. bytes: 0  
    Duration: 5010, Dest. bytes: 0  
    Duration: 1298, Dest. bytes: 0  
    Duration: 1031, Dest. bytes: 0  
    Duration: 36438, Dest. bytes: 0  

We can easily have a look at our data frame schema using printSchema.

interactions_df.printSchema()

root  
 |-- dst_bytes: long (nullable = true)  
 |-- duration: long (nullable = true)  
 |-- flag: string (nullable = true)  
 |-- protocol_type: string (nullable = true)  
 |-- service: string (nullable = true)  
 |-- src_bytes: long (nullable = true)  

Queries as DataFrame Operations

Spark DataFrame provides a domain-specific language for structured data manipulation. This language includes methods we can concatenate in order to do selection, filtering, grouping, etc. For example, let's say we want to count how many interactions are there for each protocol type. We can proceed as follows.

from time import time

t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
protocol_type count   
udp           20354   
tcp           190065  
icmp          283602  
Query performed in 20.568 seconds  

Now imagine that we want to count how many interactions last more than 1 second, with no data transfer from destination, grouped by protocol type. We can just add to filter calls to the previous.

t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").filter(interactions_df.duration>1000).filter(interactions_df.dst_bytes==0).groupBy("protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
protocol_type count  
tcp           139    
Query performed in 16.641 seconds  

We can use this to perform some exploratory data analysis. Let's count how many attack and normal interactions we have. First we need to add the label column to our data.

def get_label_type(label):
    if label!="normal.":
        return "attack"
    else:
        return "normal"
    
row_labeled_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    label=get_label_type(p[41])
    )
)
interactions_labeled_df = sqlContext.createDataFrame(row_labeled_data)

This time we don't need to register the schema since we are going to use the OO query interface.

Let's check the previous actually works by counting attack and normal data in our data frame.

    t0 = time()
    interactions_labeled_df.select("label").groupBy("label").count().show()
    tt = time() - t0
    
    print "Query performed in {} seconds".format(round(tt,3))
label  count   
attack 396743  
normal 97278   
Query performed in 17.325 seconds  

Now we want to count them by label and protocol type, in order to see how important the protocol type is to detect when an interaction is or not an attack.

    t0 = time()
    interactions_labeled_df.select("label", "protocol_type").groupBy("label", "protocol_type").count().show()
    tt = time() - t0
    
    print "Query performed in {} seconds".format(round(tt,3))
label  protocol_type count   
attack udp           1177    
attack tcp           113252  
attack icmp          282314  
normal udp           19177   
normal tcp           76813  
normal icmp          1288   
Query performed in 17.253 seconds 

At first sight it seems that udp interactions are in lower proportion between network attacks versus other protocol types.

And we can do much more sofisticated groupings. For example, add to the previous a "split" based on data transfer from target.

t0 = time()
interactions_labeled_df.select("label", "protocol_type", "dst_bytes").groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0).count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
label  protocol_type (dst_bytes = 0) count  
normal icmp          true            1288   
attack udp           true            1166  
attack udp           false           11    
normal udp           true            3594  
normal udp           false           15583  
attack tcp           true            110583  
attack tcp           false           2669  
normal tcp           true            9313  
normal tcp           false           67500  
attack icmp          true            282314  
Query performed in 17.284 seconds  

We see how relevant is this new split to determine if a network interaction is an attack.

We will stop here, but we can see how powerful this type of queries is in order to explore our data. Actually we can replicate all the splits we saw in previous notebooks, when introducing classification trees, just by selecting, groping, and filtering our dataframe. For a more detailed (but less real-world) list of Spark's DataFrame operations and data sources, have a look at the official documentation here.

Discover and read more posts from Jose A Dianes
get started
post commentsBe the first to share your opinion
SHAMIT BAGCHI
7 years ago

You can’t map a dataframe, but you can convert the dataframe to an RDD and map that by doing spark_df.rdd.map(). Prior to Spark 2.0, spark_df.map would alias to spark_df.rdd.map(). With Spark 2.0, you must explicitly call .rdd first.

Shweta Gupta
7 years ago

Thank You for the tutorials.

As an update to pyspark setup in Version 2, the IPYTHON and IPYTHON_OPTS have been replaced by PYSPARK variables, followed by other changes.

Sane Joker
7 years ago

Hi,

Thanks for this post. Its really helpful. I am new to spark and have a question. Is there any limitation to the amount of data…i.e parquet or csv that spark can load using sc.parquet or sc.textfile. My current tables are almost 100Gb each and i need to register them as temp tables before executing sql on them. Will any of the two situations above affect spark performance.Any help is highly appreciated. Thanks.

Show more replies